LCOV - code coverage report
Current view: top level - artdaq/ArtModules/RootDAQOutput-s124 - RootDAQOut_module.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 394 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 79 0

            Line data    Source code
       1              : // vim: set sw=2 expandtab :
       2              : #include "TRACE/tracemf.h"  // TLOG
       3              : #include "artdaq/DAQdata/Globals.hh"
       4              : #define TRACE_NAME (app_name + "_RootDAQOut").c_str()
       5              : 
       6              : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
       7              : #include "artdaq/ArtModules/RootDAQOutput-s124/RootDAQOutFile.h"
       8              : 
       9              : #include "art/Framework/Core/OutputModule.h"
      10              : #include "art/Framework/Core/RPManager.h"
      11              : #include "art/Framework/Core/ResultsProducer.h"
      12              : #include "art/Framework/IO/ClosingCriteria.h"
      13              : #include "art/Framework/IO/FileStatsCollector.h"
      14              : #include "art/Framework/IO/PostCloseFileRenamer.h"
      15              : #include "art/Framework/IO/detail/SafeFileNameConfig.h"
      16              : #include "art/Framework/IO/detail/logFileAction.h"
      17              : #include "art/Framework/IO/detail/validateFileNamePattern.h"
      18              : #include "art/Framework/Principal/EventPrincipal.h"
      19              : #include "art/Framework/Principal/ResultsPrincipal.h"
      20              : #include "art/Framework/Principal/RunPrincipal.h"
      21              : #include "art/Framework/Principal/SubRunPrincipal.h"
      22              : #include "art/Utilities/Globals.h"
      23              : #include "art/Utilities/parent_path.h"
      24              : #include "art/Utilities/unique_filename.h"
      25              : #include "art_root_io/DropMetaData.h"
      26              : #include "art_root_io/FastCloningEnabled.h"
      27              : #include "art_root_io/RootFileBlock.h"
      28              : #include "art_root_io/detail/rootOutputConfigurationTools.h"
      29              : #include "art_root_io/setup.h"
      30              : #include "canvas/Persistency/Provenance/ProductTables.h"
      31              : #include "canvas/Utilities/Exception.h"
      32              : #include "fhiclcpp/ParameterSet.h"
      33              : #include "fhiclcpp/types/Atom.h"
      34              : #include "fhiclcpp/types/ConfigurationTable.h"
      35              : #include "fhiclcpp/types/OptionalAtom.h"
      36              : #include "fhiclcpp/types/OptionalSequence.h"
      37              : #include "fhiclcpp/types/Table.h"
      38              : #include "fhiclcpp/types/TableFragment.h"
      39              : #include "messagefacility/MessageLogger/MessageLogger.h"
      40              : 
      41              : #include <memory>
      42              : #include <mutex>
      43              : #include <set>
      44              : #include <string>
      45              : #include <vector>
      46              : 
      47              : using namespace std;
      48              : using namespace hep::concurrency;
      49              : 
      50              : namespace {
      51              : string const dev_null{"/dev/null"};
      52              : 
      53            0 : bool maxCriterionSpecified(art::ClosingCriteria const& cc)
      54              : {
      55            0 :         auto fp = mem_fn(&art::ClosingCriteria::fileProperties);
      56            0 :         return (fp(cc).nEvents() !=
      57            0 :                 art::ClosingCriteria::Defaults::unsigned_max()) ||
      58            0 :                (fp(cc).nSubRuns() !=
      59            0 :                 art::ClosingCriteria::Defaults::unsigned_max()) ||
      60            0 :                (fp(cc).nRuns() != art::ClosingCriteria::Defaults::unsigned_max()) ||
      61            0 :                (fp(cc).size() != art::ClosingCriteria::Defaults::size_max()) ||
      62            0 :                (fp(cc).age().count() !=
      63            0 :                 art::ClosingCriteria::Defaults::seconds_max());
      64              : }
      65              : 
      66            0 : auto shouldFastClone(bool const fastCloningSet,
      67              :                      bool const fastCloning,
      68              :                      bool const wantAllEvents,
      69              :                      art::ClosingCriteria const& cc)
      70              : {
      71            0 :         art::FastCloningEnabled enabled;
      72            0 :         if (fastCloningSet and not fastCloning)
      73              :         {
      74            0 :                 enabled.disable(
      75              :                     "RootDAQOut configuration explicitly disables fast cloning.");
      76            0 :                 return enabled;
      77              :         }
      78              : 
      79            0 :         if (not wantAllEvents)
      80              :         {
      81            0 :                 enabled.disable(
      82              :                     "Event-selection has been specified in the RootDAQOut configuration.");
      83              :         }
      84            0 :         if (fastCloning && maxCriterionSpecified(cc) &&
      85            0 :             cc.granularity() < art::Granularity::InputFile)
      86              :         {
      87            0 :                 enabled.disable(
      88              :                     "File-switching has been requested at event, subrun, or "
      89              :                     "run boundaries.");
      90              :         }
      91            0 :         return enabled;
      92            0 : }
      93              : }  // namespace
      94              : 
      95              : namespace art {
      96              : 
      97              : class RootDAQOut final : public OutputModule
      98              : {
      99              : public:
     100              :         static constexpr char const* default_tmpDir{"<parent-path-of-filename>"};
     101              : 
     102              :         struct Config
     103              :         {
     104              :                 using Name = fhicl::Name;
     105              :                 using Comment = fhicl::Comment;
     106              :                 template<typename T>
     107              :                 using Atom = fhicl::Atom<T>;
     108              :                 template<typename T>
     109              :                 using OptionalAtom = fhicl::OptionalAtom<T>;
     110              :                 fhicl::TableFragment<OutputModule::Config> omConfig;
     111              :                 Atom<string> catalog{Name("catalog"), ""};
     112              :                 OptionalAtom<bool> dropAllEvents{Name("dropAllEvents")};
     113              :                 Atom<bool> dropAllSubRuns{Name("dropAllSubRuns"), false};
     114              :                 OptionalAtom<bool> fastCloning{Name("fastCloning")};
     115              :                 Atom<string> tmpDir{Name("tmpDir"), default_tmpDir};
     116            0 :                 Atom<int> compressionLevel{Name("compressionLevel"), 7};
     117            0 :                 Atom<unsigned> freePercent{Name("freePercent"), 0};
     118            0 :                 Atom<unsigned> freeMB{Name("freeMB"), 0};
     119              :                 Atom<int64_t> saveMemoryObjectThreshold{Name("saveMemoryObjectThreshold"),
     120            0 :                                                         -1l};
     121            0 :                 Atom<int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
     122            0 :                 Atom<int> splitLevel{Name("splitLevel"), 1};
     123            0 :                 Atom<int> basketSize{Name("basketSize"), 16384};
     124              :                 Atom<bool> dropMetaDataForDroppedData{Name("dropMetaDataForDroppedData"),
     125              :                                                       false};
     126              :                 Atom<string> dropMetaData{Name("dropMetaData"), "NONE"};
     127              :                 Atom<bool> writeParameterSets{Name("writeParameterSets"), true};
     128              :                 fhicl::Table<ClosingCriteria::Config> fileProperties{
     129              :                     Name("fileProperties"),
     130              :                     Comment("The 'fileProperties' parameter is specified to enable "
     131              :                             "output-file switching.")};
     132              :                 fhicl::TableFragment<detail::SafeFileNameConfig> safeFileName;
     133            0 :                 Atom<int> firstLoggerRank{Name("firstLoggerRank"), -1};
     134              : 
     135              :                 struct NewSubStringForApp
     136              :                 {
     137              :                         fhicl::Atom<string> appName{fhicl::Name("appName")};
     138              :                         fhicl::Atom<string> newString{fhicl::Name("newString")};
     139              :                 };
     140              :                 struct FileNameSubstitution
     141              :                 {
     142              :                         fhicl::Atom<string> targetString{fhicl::Name("targetString")};
     143              :                         fhicl::Sequence<fhicl::Table<NewSubStringForApp>> replacementList{fhicl::Name("replacementList")};
     144              :                 };
     145              :                 fhicl::OptionalSequence<fhicl::Table<FileNameSubstitution>> fileNameSubstitutions{Name("fileNameSubstitutions")};
     146              : 
     147            0 :                 Config()
     148            0 :                 {
     149              :                         // Both RootDAQOut module and OutputModule use the "fileName"
     150              :                         // FHiCL parameter.  However, whereas in OutputModule the
     151              :                         // parameter has a default, for RootDAQOut the parameter should
     152              :                         // not.  We therefore have to change the default flag setting
     153              :                         // for 'OutputModule::Config::fileName'.
     154              :                         using namespace fhicl::detail;
     155              :                         ParameterBase* adjustFilename{
     156            0 :                             const_cast<fhicl::Atom<string>*>(&omConfig().fileName)};  // NOLINT(cppcoreguidelines-pro-type-const-cast)
     157            0 :                         adjustFilename->set_par_style(fhicl::par_style::REQUIRED);
     158            0 :                 }
     159              : 
     160              :                 struct KeysToIgnore
     161              :                 {
     162              :                         set<string>
     163            0 :                         operator()() const
     164              :                         {
     165            0 :                                 set<string> keys{OutputModule::Config::KeysToIgnore::get()};
     166            0 :                                 keys.insert("results");
     167            0 :                                 return keys;
     168            0 :                         }
     169              :                 };
     170              :         };
     171              : 
     172              :         using Parameters = fhicl::WrappedTable<Config, Config::KeysToIgnore>;
     173              : 
     174              :         ~RootDAQOut();
     175              :         explicit RootDAQOut(Parameters const&);
     176              :         RootDAQOut(RootDAQOut const&) = delete;
     177              :         RootDAQOut(RootDAQOut&&) = delete;
     178              :         RootDAQOut& operator=(RootDAQOut const&) = delete;
     179              :         RootDAQOut& operator=(RootDAQOut&&) = delete;
     180              : 
     181              :         void postSelectProducts() override;
     182              :         void beginJob() override;
     183              :         void endJob() override;
     184              :         void beginRun(RunPrincipal const&) override;
     185              :         void endRun(RunPrincipal const&) override;
     186              :         void beginSubRun(SubRunPrincipal const&) override;
     187              :         void endSubRun(SubRunPrincipal const&) override;
     188              :         void event(EventPrincipal const&) override;
     189              : 
     190              : private:
     191              :         // Replace OutputModule Functions.
     192              :         string fileNameAtOpen() const;
     193              :         string fileNameAtClose(string const& currentFileName);
     194              :         string const& lastClosedFileName() const override;
     195              :         Granularity fileGranularity() const override;
     196              :         void openFile(FileBlock const&) override;
     197              :         void respondToOpenInputFile(FileBlock const&) override;
     198              :         void readResults(ResultsPrincipal const& resp) override;
     199              :         void respondToCloseInputFile(FileBlock const&) override;
     200              :         void incrementInputFileNumber() override;
     201              :         void write(EventPrincipal&) override;
     202              :         void writeSubRun(SubRunPrincipal&) override;
     203              :         void writeRun(RunPrincipal&) override;
     204              :         void setSubRunAuxiliaryRangeSetID(RangeSet const&) override;
     205              :         void setRunAuxiliaryRangeSetID(RangeSet const&) override;
     206              :         bool isFileOpen() const override;
     207              :         void setFileStatus(OutputFileStatus) override;
     208              :         bool requestsToCloseFile() const override;
     209              :         void startEndFile() override;
     210              :         void writeFileFormatVersion() override;
     211              :         void writeFileIndex() override;
     212              :         void writeProcessConfigurationRegistry() override;
     213              :         void writeProcessHistoryRegistry() override;
     214              :         void writeParameterSetRegistry() override;
     215              :         void writeProductDescriptionRegistry() override;
     216              :         void writeParentageRegistry() override;
     217              :         void doWriteFileCatalogMetadata(
     218              :             FileCatalogMetadata::collection_type const& md,
     219              :             FileCatalogMetadata::collection_type const& ssmd) override;
     220              :         void writeProductDependencies() override;
     221              :         void finishEndFile() override;
     222              :         void doRegisterProducts(ProductDescriptions& productsToProduce,
     223              :                                 ModuleDescription const& md) override;
     224              :         std::string modifyFilePattern(std::string const& /*inputPattern*/, Config const& /*config*/);
     225              : 
     226              :         // Implementation Details.
     227              :         void doOpenFile();
     228              : 
     229              :         // Data Members.
     230              :         mutable std::recursive_mutex mutex_;
     231              :         string const catalog_;
     232              :         bool dropAllEvents_{false};
     233              :         bool dropAllSubRuns_;
     234              :         string const moduleLabel_;
     235              :         int inputFileCount_{};
     236              :         unique_ptr<RootDAQOutFile> rootOutputFile_{nullptr};
     237              :         FileStatsCollector fstats_;
     238              :         PostCloseFileRenamer fRenamer_;
     239              :         string const filePattern_;
     240              :         string tmpDir_;
     241              :         string lastClosedFileName_{};
     242              :         int const compressionLevel_;
     243              :         unsigned freePercent_;
     244              :         unsigned freeMB_;
     245              :         int64_t const saveMemoryObjectThreshold_;
     246              :         int64_t const treeMaxVirtualSize_;
     247              :         int const splitLevel_;
     248              :         int const basketSize_;
     249              :         DropMetaData dropMetaData_;
     250              :         bool dropMetaDataForDroppedData_;
     251              :         FastCloningEnabled fastCloningEnabled_{};
     252              :         // Set false only for cases where we are guaranteed never to need historical
     253              :         // ParameterSet information in the downstream file, such as when mixing.
     254              :         bool writeParameterSets_;
     255              :         ClosingCriteria fileProperties_;
     256              :         ProductDescriptions productsToProduce_{};
     257              :         ProductTables producedResultsProducts_{ProductTables::invalid()};
     258              :         RPManager rpm_;
     259              : };
     260              : 
     261            0 : RootDAQOut::~RootDAQOut() = default;
     262              : 
     263            0 : RootDAQOut::RootDAQOut(Parameters const& config)
     264              :     : OutputModule{
     265            0 :           config().omConfig}
     266            0 :     , catalog_{config().catalog()}
     267            0 :     , dropAllSubRuns_{config().dropAllSubRuns()}
     268            0 :     , moduleLabel_{config.get_PSet().get<string>("module_label")}
     269            0 :     , fstats_{moduleLabel_, processName()}
     270            0 :     , fRenamer_{fstats_}
     271            0 :     , filePattern_{modifyFilePattern(config().omConfig().fileName(), config())}
     272            0 :     , tmpDir_{config().tmpDir() == default_tmpDir ? parent_path(filePattern_) : config().tmpDir()}
     273            0 :     , compressionLevel_{config().compressionLevel()}
     274            0 :     , freePercent_{config().freePercent()}
     275            0 :     , freeMB_{config().freeMB()}
     276            0 :     , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
     277            0 :     , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
     278            0 :     , splitLevel_{config().splitLevel()}
     279            0 :     , basketSize_{config().basketSize()}
     280            0 :     , dropMetaData_{config().dropMetaData()}
     281            0 :     , dropMetaDataForDroppedData_{config().dropMetaDataForDroppedData()}
     282            0 :     , writeParameterSets_{config().writeParameterSets()}
     283            0 :     , fileProperties_{config().fileProperties()}
     284            0 :     , rpm_{config.get_PSet()}
     285              : {
     286            0 :         TLOG(TLVL_INFO) << "RootDAQOut_module (s124 version) CONSTRUCTOR Start";
     287            0 :         bool const check_filename = config.get_PSet().has_key("fileProperties") and
     288            0 :                                     config().safeFileName().checkFileName();
     289            0 :         detail::validateFileNamePattern(check_filename, filePattern_);
     290              : 
     291              :         // Setup the streamers and error handlers.
     292            0 :         root::setup();
     293              : 
     294            0 :         bool const dropAllEventsSet{config().dropAllEvents(dropAllEvents_)};
     295            0 :         dropAllEvents_ = detail::shouldDropEvents(
     296            0 :             dropAllEventsSet, dropAllEvents_, dropAllSubRuns_);
     297              :         // N.B. Any time file switching is enabled at a boundary other than
     298              :         //      InputFile, fastCloningEnabled_ ***MUST*** be deactivated.  This is
     299              :         //      to ensure that the Event tree from the InputFile is not
     300              :         //      accidentally cloned to the output file before the output
     301              :         //      module has seen the events that are going to be processed.
     302            0 :         bool fastCloningEnabled{true};
     303            0 :         bool const fastCloningSet{config().fastCloning(fastCloningEnabled)};
     304            0 :         fastCloningEnabled_ = shouldFastClone(
     305            0 :             fastCloningSet, fastCloningEnabled, wantAllEvents(), fileProperties_);
     306              : 
     307            0 :         if (auto const n = Globals::instance()->nschedules(); n > 1)
     308              :         {
     309            0 :                 std::ostringstream oss;
     310            0 :                 oss << "More than one schedule (" << n << ") is being used.";
     311            0 :                 fastCloningEnabled_.disable(oss.str());
     312            0 :         }
     313              : 
     314            0 :         if (!writeParameterSets_)
     315              :         {
     316            0 :                 mf::LogWarning("PROVENANCE")
     317            0 :                     << "Output module " << moduleLabel_
     318            0 :                     << " has parameter writeParameterSets set to false.\n"
     319              :                     << "Parameter set provenance will not be available in subsequent "
     320            0 :                        "jobs.\n"
     321              :                     << "Check your experiment's policy on this issue to avoid future "
     322            0 :                        "problems\n"
     323            0 :                     << "with analysis reproducibility.\n";
     324              :         }
     325            0 : }
     326              : 
     327            0 : void RootDAQOut::openFile(FileBlock const& fb)
     328              : {
     329            0 :         std::lock_guard sentry{mutex_};
     330              :         // Note: The file block here refers to the currently open
     331              :         //       input file, so we can find out about the available
     332              :         //       products by looping over the branches of the input
     333              :         //       file data trees.
     334            0 :         if (!isFileOpen())
     335              :         {
     336            0 :                 doOpenFile();
     337            0 :                 respondToOpenInputFile(fb);
     338              :         }
     339            0 : }
     340              : 
     341            0 : void RootDAQOut::postSelectProducts()
     342              : {
     343            0 :         std::lock_guard sentry{mutex_};
     344            0 :         if (isFileOpen())
     345              :         {
     346            0 :                 rootOutputFile_->selectProducts();
     347              :         }
     348            0 : }
     349              : 
     350            0 : void RootDAQOut::respondToOpenInputFile(FileBlock const& fb)
     351              : {
     352            0 :         std::lock_guard sentry{mutex_};
     353            0 :         ++inputFileCount_;
     354            0 :         if (!isFileOpen())
     355              :         {
     356            0 :                 return;
     357              :         }
     358            0 :         auto const* rfb = dynamic_cast<RootFileBlock const*>(&fb);
     359            0 :         auto fastCloneThisOne = fastCloningEnabled_;
     360            0 :         if (!rfb)
     361              :         {
     362            0 :                 fastCloneThisOne.disable("Input source does not read art/ROOT files.");
     363              :         }
     364              :         else
     365              :         {
     366            0 :                 fastCloneThisOne.merge(rfb->fastClonable());
     367              :         }
     368            0 :         rootOutputFile_->beginInputFile(rfb, std::move(fastCloneThisOne));
     369            0 :         fstats_.recordInputFile(fb.fileName());
     370            0 : }
     371              : 
     372            0 : void RootDAQOut::readResults(ResultsPrincipal const& resp)
     373              : {
     374            0 :         std::lock_guard sentry{mutex_};
     375            0 :         rpm_.for_each_RPWorker(
     376            0 :             [&resp](RPWorker& w) { w.rp().doReadResults(resp); });
     377            0 : }
     378              : 
     379            0 : void RootDAQOut::respondToCloseInputFile(FileBlock const& fb)
     380              : {
     381            0 :         std::lock_guard sentry{mutex_};
     382            0 :         if (isFileOpen())
     383              :         {
     384            0 :                 rootOutputFile_->respondToCloseInputFile(fb);
     385              :         }
     386            0 : }
     387              : 
     388            0 : void RootDAQOut::write(EventPrincipal& ep)
     389              : {
     390            0 :         std::lock_guard sentry{mutex_};
     391            0 :         if (dropAllEvents_)
     392              :         {
     393            0 :                 return;
     394              :         }
     395            0 :         if (hasNewlyDroppedBranch()[InEvent])
     396              :         {
     397            0 :                 ep.addToProcessHistory();
     398            0 :                 ep.refreshProcessHistoryID();
     399              :         }
     400            0 :         rootOutputFile_->writeOne(ep);
     401            0 :         fstats_.recordEvent(ep.eventID());
     402            0 : }
     403              : 
     404            0 : void RootDAQOut::setSubRunAuxiliaryRangeSetID(RangeSet const& rs)
     405              : {
     406            0 :         std::lock_guard sentry{mutex_};
     407            0 :         rootOutputFile_->setSubRunAuxiliaryRangeSetID(rs);
     408            0 : }
     409              : 
     410            0 : void RootDAQOut::writeSubRun(SubRunPrincipal& sr)
     411              : {
     412            0 :         std::lock_guard sentry{mutex_};
     413            0 :         if (dropAllSubRuns_)
     414              :         {
     415            0 :                 return;
     416              :         }
     417            0 :         if (hasNewlyDroppedBranch()[InSubRun])
     418              :         {
     419            0 :                 sr.addToProcessHistory();
     420              :         }
     421            0 :         rootOutputFile_->writeSubRun(sr);
     422            0 :         fstats_.recordSubRun(sr.subRunID());
     423            0 : }
     424              : 
     425            0 : void RootDAQOut::setRunAuxiliaryRangeSetID(RangeSet const& rs)
     426              : {
     427            0 :         std::lock_guard sentry{mutex_};
     428            0 :         rootOutputFile_->setRunAuxiliaryRangeSetID(rs);
     429            0 : }
     430              : 
     431            0 : void RootDAQOut::writeRun(RunPrincipal& rp)
     432              : {
     433            0 :         std::lock_guard sentry{mutex_};
     434            0 :         if (hasNewlyDroppedBranch()[InRun])
     435              :         {
     436            0 :                 rp.addToProcessHistory();
     437              :         }
     438            0 :         rootOutputFile_->writeRun(rp);
     439            0 :         fstats_.recordRun(rp.runID());
     440            0 : }
     441              : 
     442            0 : void RootDAQOut::startEndFile()
     443              : {
     444            0 :         std::lock_guard sentry{mutex_};
     445              :         auto resp = make_unique<ResultsPrincipal>(
     446            0 :             ResultsAuxiliary{}, moduleDescription().processConfiguration(), nullptr);
     447            0 :         resp->createGroupsForProducedProducts(producedResultsProducts_);
     448            0 :         resp->enableLookupOfProducedProducts();
     449            0 :         if (!producedResultsProducts_.descriptions(InResults).empty() ||
     450            0 :             hasNewlyDroppedBranch()[InResults])
     451              :         {
     452            0 :                 resp->addToProcessHistory();
     453              :         }
     454            0 :         rpm_.for_each_RPWorker(
     455            0 :             [&resp](RPWorker& w) { w.rp().doWriteResults(*resp); });
     456            0 :         rootOutputFile_->writeResults(*resp);
     457            0 : }
     458              : 
     459            0 : void RootDAQOut::writeFileFormatVersion()
     460              : {
     461            0 :         std::lock_guard sentry{mutex_};
     462            0 :         rootOutputFile_->writeFileFormatVersion();
     463            0 : }
     464              : 
     465            0 : void RootDAQOut::writeFileIndex()
     466              : {
     467            0 :         std::lock_guard sentry{mutex_};
     468            0 :         rootOutputFile_->writeFileIndex();
     469            0 : }
     470              : 
     471            0 : void RootDAQOut::writeProcessConfigurationRegistry()
     472              : {
     473            0 :         std::lock_guard sentry{mutex_};
     474            0 :         rootOutputFile_->writeProcessConfigurationRegistry();
     475            0 : }
     476              : 
     477            0 : void RootDAQOut::writeProcessHistoryRegistry()
     478              : {
     479            0 :         std::lock_guard sentry{mutex_};
     480            0 :         rootOutputFile_->writeProcessHistoryRegistry();
     481            0 : }
     482              : 
     483            0 : void RootDAQOut::writeParameterSetRegistry()
     484              : {
     485            0 :         std::lock_guard sentry{mutex_};
     486            0 :         if (writeParameterSets_)
     487              :         {
     488            0 :                 rootOutputFile_->writeParameterSetRegistry();
     489              :         }
     490            0 : }
     491              : 
     492            0 : void RootDAQOut::writeProductDescriptionRegistry()
     493              : {
     494            0 :         std::lock_guard sentry{mutex_};
     495            0 :         rootOutputFile_->writeProductDescriptionRegistry();
     496            0 : }
     497              : 
     498            0 : void RootDAQOut::writeParentageRegistry()
     499              : {
     500            0 :         std::lock_guard sentry{mutex_};
     501            0 :         rootOutputFile_->writeParentageRegistry();
     502            0 : }
     503              : 
     504            0 : void RootDAQOut::doWriteFileCatalogMetadata(
     505              :     FileCatalogMetadata::collection_type const& md,
     506              :     FileCatalogMetadata::collection_type const& ssmd)
     507              : {
     508            0 :         std::lock_guard sentry{mutex_};
     509            0 :         rootOutputFile_->writeFileCatalogMetadata(fstats_, md, ssmd);
     510            0 : }
     511              : 
     512            0 : void RootDAQOut::writeProductDependencies()
     513              : {
     514            0 :         std::lock_guard sentry{mutex_};
     515            0 :         rootOutputFile_->writeProductDependencies();
     516            0 : }
     517              : 
     518            0 : void RootDAQOut::finishEndFile()
     519              : {
     520            0 :         std::lock_guard sentry{mutex_};
     521            0 :         string const currentFileName{rootOutputFile_->currentFileName()};
     522            0 :         rootOutputFile_->writeTTrees();
     523            0 :         rootOutputFile_.reset();
     524            0 :         fstats_.recordFileClose();
     525            0 :         lastClosedFileName_ = fileNameAtClose(currentFileName);
     526            0 :         TLOG(TLVL_INFO) << __func__ << ": Closed output file \"" << lastClosedFileName_ << "\"";
     527            0 :         rpm_.invoke(&ResultsProducer::doClear);
     528            0 : }
     529              : 
     530            0 : void RootDAQOut::doRegisterProducts(ProductDescriptions& producedProducts,
     531              :                                     ModuleDescription const& md)
     532              : {
     533            0 :         std::lock_guard sentry{mutex_};
     534              :         // Register Results products from ResultsProducers.
     535            0 :         rpm_.for_each_RPWorker([&producedProducts, &md](RPWorker& w) {
     536            0 :                 auto const& params = w.params();
     537            0 :                 w.setModuleDescription(
     538            0 :                     ModuleDescription{params.rpPSetID,
     539            0 :                                       params.rpPluginType,
     540            0 :                                       md.moduleLabel() + '#' + params.rpLabel,
     541              :                                       ModuleThreadingType::legacy,
     542              :                                       md.processConfiguration()});
     543            0 :                 w.rp().registerProducts(producedProducts, w.moduleDescription());
     544            0 :         });
     545              :         // Form product table for Results products.  We do this here so we
     546              :         // can appropriately set the product tables for the
     547              :         // ResultsPrincipal.
     548            0 :         productsToProduce_ = producedProducts;
     549            0 :         producedResultsProducts_ = ProductTables{productsToProduce_};
     550            0 : }
     551              : 
     552            0 : void RootDAQOut::setFileStatus(OutputFileStatus const ofs)
     553              : {
     554            0 :         std::lock_guard sentry{mutex_};
     555            0 :         if (isFileOpen())
     556              :         {
     557            0 :                 rootOutputFile_->setFileStatus(ofs);
     558              :         }
     559            0 : }
     560              : 
     561            0 : bool RootDAQOut::isFileOpen() const
     562              : {
     563            0 :         std::lock_guard sentry{mutex_};
     564            0 :         return rootOutputFile_.get() != nullptr;
     565            0 : }
     566              : 
     567            0 : void RootDAQOut::incrementInputFileNumber()
     568              : {
     569            0 :         std::lock_guard sentry{mutex_};
     570            0 :         if (isFileOpen())
     571              :         {
     572            0 :                 rootOutputFile_->incrementInputFileNumber();
     573              :         }
     574            0 : }
     575              : 
     576            0 : bool RootDAQOut::requestsToCloseFile() const
     577              : {
     578            0 :         std::lock_guard sentry{mutex_};
     579            0 :         return isFileOpen() ? rootOutputFile_->requestsToCloseFile() : false;
     580            0 : }
     581              : 
     582              : Granularity
     583            0 : RootDAQOut::fileGranularity() const
     584              : {
     585            0 :         std::lock_guard sentry{mutex_};
     586            0 :         return fileProperties_.granularity();
     587            0 : }
     588              : 
     589            0 : void RootDAQOut::doOpenFile()
     590              : {
     591            0 :         std::lock_guard sentry{mutex_};
     592            0 :         if (inputFileCount_ == 0)
     593              :         {
     594            0 :                 throw Exception(errors::LogicError)  // NOLINT(cert-err60-cpp)
     595              :                     << "Attempt to open output file before input file. "
     596            0 :                     << "Please report this to the core framework developers.\n";
     597              :         }
     598            0 :         rootOutputFile_ = make_unique<RootDAQOutFile>(this,
     599            0 :                                                       fileNameAtOpen(),
     600            0 :                                                       fileProperties_,
     601            0 :                                                       compressionLevel_,
     602            0 :                                                       freePercent_,
     603            0 :                                                       freeMB_,
     604            0 :                                                       saveMemoryObjectThreshold_,
     605            0 :                                                       treeMaxVirtualSize_,
     606            0 :                                                       splitLevel_,
     607            0 :                                                       basketSize_,
     608            0 :                                                       dropMetaData_,
     609            0 :                                                       dropMetaDataForDroppedData_);
     610            0 :         fstats_.recordFileOpen();
     611            0 :         TLOG(TLVL_INFO) << __func__ << ": Opened output file with pattern \"" << filePattern_ << "\"";
     612            0 : }
     613              : 
     614              : string
     615            0 : RootDAQOut::fileNameAtOpen() const
     616              : {
     617            0 :         return (filePattern_ == dev_null) ? dev_null : unique_filename(tmpDir_ + "/RootDAQOut");
     618              : }
     619              : 
     620              : string
     621            0 : RootDAQOut::fileNameAtClose(std::string const& currentFileName)
     622              : {
     623            0 :         return (filePattern_ == dev_null) ? dev_null : fRenamer_.maybeRenameFile(currentFileName, filePattern_);
     624              : }
     625              : 
     626              : string const&
     627            0 : RootDAQOut::lastClosedFileName() const
     628              : {
     629            0 :         std::lock_guard sentry{mutex_};
     630            0 :         if (lastClosedFileName_.empty())
     631              :         {
     632            0 :                 throw Exception(errors::LogicError, "RootDAQOut::currentFileName(): ")  // NOLINT(cert-err60-cpp)
     633            0 :                     << "called before meaningful.\n";
     634              :         }
     635            0 :         return lastClosedFileName_;
     636            0 : }
     637              : 
     638            0 : void RootDAQOut::beginJob()
     639              : {
     640            0 :         std::lock_guard sentry{mutex_};
     641            0 :         rpm_.invoke(&ResultsProducer::doBeginJob);
     642            0 : }
     643              : 
     644            0 : void RootDAQOut::endJob()
     645              : {
     646            0 :         std::lock_guard sentry{mutex_};
     647            0 :         rpm_.invoke(&ResultsProducer::doEndJob);
     648            0 : }
     649              : 
     650            0 : void RootDAQOut::event(EventPrincipal const& ep)
     651              : {
     652            0 :         std::lock_guard sentry{mutex_};
     653            0 :         rpm_.for_each_RPWorker([&ep](RPWorker& w) { w.rp().doEvent(ep); });
     654            0 : }
     655              : 
     656            0 : void RootDAQOut::beginSubRun(SubRunPrincipal const& srp)
     657              : {
     658            0 :         std::lock_guard sentry{mutex_};
     659            0 :         rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doBeginSubRun(srp); });
     660            0 : }
     661              : 
     662            0 : void RootDAQOut::endSubRun(SubRunPrincipal const& srp)
     663              : {
     664            0 :         std::lock_guard sentry{mutex_};
     665            0 :         rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doEndSubRun(srp); });
     666            0 : }
     667              : 
     668            0 : void RootDAQOut::beginRun(RunPrincipal const& rp)
     669              : {
     670            0 :         std::lock_guard sentry{mutex_};
     671            0 :         rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doBeginRun(rp); });
     672            0 : }
     673              : 
     674            0 : void RootDAQOut::endRun(RunPrincipal const& rp)
     675              : {
     676            0 :         std::lock_guard sentry{mutex_};
     677            0 :         rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doEndRun(rp); });
     678            0 : }
     679              : 
     680              : std::string
     681            0 : RootDAQOut::modifyFilePattern(std::string const& inputPattern, Config const& config)
     682              : {
     683              :         // Make sure that the shared memory is connected
     684            0 :         art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
     685              : 
     686            0 :         TLOG(TLVL_DEBUG + 32) << __func__ << ": inputPattern=\"" << inputPattern << "\"";
     687              : 
     688              :         // fetch the firstLoggerRank and fileNameSubstitutions (if provided) for use in
     689              :         // substituting keywords in the filename pattern
     690            0 :         int firstLoggerRank = config.firstLoggerRank();
     691            0 :         std::vector<Config::FileNameSubstitution> subs;
     692            0 :         config.fileNameSubstitutions(subs);
     693            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ": firstLoggerRank=" << firstLoggerRank
     694            0 :                               << ", numberOfSubstitutionsProvided=" << subs.size();
     695              : 
     696              :         // initialization
     697            0 :         std::string modifiedPattern = inputPattern;
     698            0 :         std::string searchString;
     699              :         size_t targetLocation;
     700            0 :         int zeroBasedRelativeRank = my_rank;
     701            0 :         int oneBasedRelativeRank = my_rank + 1;
     702            0 :         if (firstLoggerRank >= 0)
     703              :         {
     704            0 :                 zeroBasedRelativeRank -= firstLoggerRank;
     705            0 :                 oneBasedRelativeRank -= firstLoggerRank;
     706              :         }
     707            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ": my_rank=" << my_rank << ", zeroBasedRelativeRank=" << zeroBasedRelativeRank
     708            0 :                               << ", oneBasedRelativeRank=" << oneBasedRelativeRank;
     709              : 
     710              :         // if the "ZeroBasedRelativeRank" keyword was specified in the filename pattern,
     711              :         // perform the substitution
     712            0 :         searchString = "${ZeroBasedRelativeRank}";
     713            0 :         targetLocation = modifiedPattern.find(searchString);
     714            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     715            0 :         while (targetLocation != std::string::npos)
     716              :         {
     717            0 :                 std::ostringstream oss;
     718            0 :                 oss << zeroBasedRelativeRank;
     719            0 :                 modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
     720            0 :                 targetLocation = modifiedPattern.find(searchString);
     721            0 :                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     722            0 :         }
     723              : 
     724              :         // if the "OneBasedRelativeRank" keyword was specified in the filename pattern,
     725              :         // perform the substitution
     726            0 :         searchString = "${OneBasedRelativeRank}";
     727            0 :         targetLocation = modifiedPattern.find(searchString);
     728            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     729            0 :         while (targetLocation != std::string::npos)
     730              :         {
     731            0 :                 std::ostringstream oss;
     732            0 :                 oss << oneBasedRelativeRank;
     733            0 :                 modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
     734            0 :                 targetLocation = modifiedPattern.find(searchString);
     735            0 :                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     736            0 :         }
     737              : 
     738              :         // if the "Rank" keyword was specified in the filename pattern,
     739              :         // perform the substitution
     740            0 :         searchString = "${Rank}";
     741            0 :         targetLocation = modifiedPattern.find(searchString);
     742            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     743            0 :         while (targetLocation != std::string::npos)
     744              :         {
     745            0 :                 std::ostringstream oss;
     746            0 :                 oss << my_rank;
     747            0 :                 modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
     748            0 :                 targetLocation = modifiedPattern.find(searchString);
     749            0 :                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     750            0 :         }
     751              : 
     752              :         // if the "Rank" keyword was specified in the filename pattern,
     753              :         // perform the substitution
     754            0 :         searchString = "${app_name}";
     755            0 :         targetLocation = modifiedPattern.find(searchString);
     756            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     757            0 :         while (targetLocation != std::string::npos)
     758              :         {
     759            0 :                 std::ostringstream oss;
     760            0 :                 oss << artdaq::Globals::app_name_;
     761            0 :                 modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
     762            0 :                 targetLocation = modifiedPattern.find(searchString);
     763            0 :                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     764            0 :         }
     765              : 
     766              :         // if one or more free-form substitutions were provided, we'll do them here
     767            0 :         for (auto& sub : subs)
     768              :         {
     769              :                 // first look up the replacement string for this process's app_name
     770            0 :                 const std::string BLAH = "none_provided";
     771            0 :                 std::string newString = BLAH;
     772            0 :                 std::vector<Config::NewSubStringForApp> replacementList = sub.replacementList();
     773            0 :                 for (auto& rdx : replacementList)
     774              :                 {
     775            0 :                         if (rdx.appName() == artdaq::Globals::app_name_)
     776              :                         {
     777            0 :                                 newString = rdx.newString();
     778            0 :                                 break;
     779              :                         }
     780              :                 }
     781            0 :                 TLOG(TLVL_DEBUG + 33) << __func__ << ": app_name=" << artdaq::Globals::app_name_ << ", newString=" << newString;
     782            0 :                 if (newString != BLAH)
     783              :                 {
     784              :                         // first, add the expected surrounding text, and search for that
     785            0 :                         searchString = "${" + sub.targetString() + "}";
     786            0 :                         targetLocation = modifiedPattern.find(searchString);
     787            0 :                         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     788            0 :                         while (targetLocation != std::string::npos)
     789              :                         {
     790            0 :                                 modifiedPattern.replace(targetLocation, searchString.length(), newString);
     791            0 :                                 targetLocation = modifiedPattern.find(searchString);
     792            0 :                                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     793              :                         }
     794              : 
     795              :                         // then, search for the provided string, verbatim, in case the user specified
     796              :                         // the enclosing text in the configuration document
     797            0 :                         searchString = sub.targetString();
     798            0 :                         targetLocation = modifiedPattern.find(searchString);
     799            0 :                         TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     800            0 :                         while (targetLocation != std::string::npos)
     801              :                         {
     802            0 :                                 modifiedPattern.replace(targetLocation, searchString.length(), newString);
     803            0 :                                 targetLocation = modifiedPattern.find(searchString);
     804            0 :                                 TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
     805              :                         }
     806              :                 }
     807            0 :         }
     808              : 
     809            0 :         TLOG(TLVL_DEBUG + 32) << __func__ << ": modifiedPattern = \"" << modifiedPattern << "\"";
     810            0 :         return modifiedPattern;
     811            0 : }
     812              : 
     813              : }  // namespace art
     814              : 
     815            0 : DEFINE_ART_MODULE(art::RootDAQOut)  // NOLINT(performance-unnecessary-value-param)
        

Generated by: LCOV version 2.0-1