Line data Source code
1 : // vim: set sw=2 expandtab :
2 : #include "TRACE/tracemf.h" // TLOG
3 : #include "artdaq/DAQdata/Globals.hh"
4 : #define TRACE_NAME (app_name + "_RootDAQOut").c_str()
5 :
6 : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
7 : #include "artdaq/ArtModules/RootDAQOutput-s124/RootDAQOutFile.h"
8 :
9 : #include "art/Framework/Core/OutputModule.h"
10 : #include "art/Framework/Core/RPManager.h"
11 : #include "art/Framework/Core/ResultsProducer.h"
12 : #include "art/Framework/IO/ClosingCriteria.h"
13 : #include "art/Framework/IO/FileStatsCollector.h"
14 : #include "art/Framework/IO/PostCloseFileRenamer.h"
15 : #include "art/Framework/IO/detail/SafeFileNameConfig.h"
16 : #include "art/Framework/IO/detail/logFileAction.h"
17 : #include "art/Framework/IO/detail/validateFileNamePattern.h"
18 : #include "art/Framework/Principal/EventPrincipal.h"
19 : #include "art/Framework/Principal/ResultsPrincipal.h"
20 : #include "art/Framework/Principal/RunPrincipal.h"
21 : #include "art/Framework/Principal/SubRunPrincipal.h"
22 : #include "art/Utilities/Globals.h"
23 : #include "art/Utilities/parent_path.h"
24 : #include "art/Utilities/unique_filename.h"
25 : #include "art_root_io/DropMetaData.h"
26 : #include "art_root_io/FastCloningEnabled.h"
27 : #include "art_root_io/RootFileBlock.h"
28 : #include "art_root_io/detail/rootOutputConfigurationTools.h"
29 : #include "art_root_io/setup.h"
30 : #include "canvas/Persistency/Provenance/ProductTables.h"
31 : #include "canvas/Utilities/Exception.h"
32 : #include "fhiclcpp/ParameterSet.h"
33 : #include "fhiclcpp/types/Atom.h"
34 : #include "fhiclcpp/types/ConfigurationTable.h"
35 : #include "fhiclcpp/types/OptionalAtom.h"
36 : #include "fhiclcpp/types/OptionalSequence.h"
37 : #include "fhiclcpp/types/Table.h"
38 : #include "fhiclcpp/types/TableFragment.h"
39 : #include "messagefacility/MessageLogger/MessageLogger.h"
40 :
41 : #include <memory>
42 : #include <mutex>
43 : #include <set>
44 : #include <string>
45 : #include <vector>
46 :
47 : using namespace std;
48 : using namespace hep::concurrency;
49 :
50 : namespace {
51 : string const dev_null{"/dev/null"};
52 :
53 0 : bool maxCriterionSpecified(art::ClosingCriteria const& cc)
54 : {
55 0 : auto fp = mem_fn(&art::ClosingCriteria::fileProperties);
56 0 : return (fp(cc).nEvents() !=
57 0 : art::ClosingCriteria::Defaults::unsigned_max()) ||
58 0 : (fp(cc).nSubRuns() !=
59 0 : art::ClosingCriteria::Defaults::unsigned_max()) ||
60 0 : (fp(cc).nRuns() != art::ClosingCriteria::Defaults::unsigned_max()) ||
61 0 : (fp(cc).size() != art::ClosingCriteria::Defaults::size_max()) ||
62 0 : (fp(cc).age().count() !=
63 0 : art::ClosingCriteria::Defaults::seconds_max());
64 : }
65 :
66 0 : auto shouldFastClone(bool const fastCloningSet,
67 : bool const fastCloning,
68 : bool const wantAllEvents,
69 : art::ClosingCriteria const& cc)
70 : {
71 0 : art::FastCloningEnabled enabled;
72 0 : if (fastCloningSet and not fastCloning)
73 : {
74 0 : enabled.disable(
75 : "RootDAQOut configuration explicitly disables fast cloning.");
76 0 : return enabled;
77 : }
78 :
79 0 : if (not wantAllEvents)
80 : {
81 0 : enabled.disable(
82 : "Event-selection has been specified in the RootDAQOut configuration.");
83 : }
84 0 : if (fastCloning && maxCriterionSpecified(cc) &&
85 0 : cc.granularity() < art::Granularity::InputFile)
86 : {
87 0 : enabled.disable(
88 : "File-switching has been requested at event, subrun, or "
89 : "run boundaries.");
90 : }
91 0 : return enabled;
92 0 : }
93 : } // namespace
94 :
95 : namespace art {
96 :
97 : class RootDAQOut final : public OutputModule
98 : {
99 : public:
100 : static constexpr char const* default_tmpDir{"<parent-path-of-filename>"};
101 :
102 : struct Config
103 : {
104 : using Name = fhicl::Name;
105 : using Comment = fhicl::Comment;
106 : template<typename T>
107 : using Atom = fhicl::Atom<T>;
108 : template<typename T>
109 : using OptionalAtom = fhicl::OptionalAtom<T>;
110 : fhicl::TableFragment<OutputModule::Config> omConfig;
111 : Atom<string> catalog{Name("catalog"), ""};
112 : OptionalAtom<bool> dropAllEvents{Name("dropAllEvents")};
113 : Atom<bool> dropAllSubRuns{Name("dropAllSubRuns"), false};
114 : OptionalAtom<bool> fastCloning{Name("fastCloning")};
115 : Atom<string> tmpDir{Name("tmpDir"), default_tmpDir};
116 0 : Atom<int> compressionLevel{Name("compressionLevel"), 7};
117 0 : Atom<unsigned> freePercent{Name("freePercent"), 0};
118 0 : Atom<unsigned> freeMB{Name("freeMB"), 0};
119 : Atom<int64_t> saveMemoryObjectThreshold{Name("saveMemoryObjectThreshold"),
120 0 : -1l};
121 0 : Atom<int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
122 0 : Atom<int> splitLevel{Name("splitLevel"), 1};
123 0 : Atom<int> basketSize{Name("basketSize"), 16384};
124 : Atom<bool> dropMetaDataForDroppedData{Name("dropMetaDataForDroppedData"),
125 : false};
126 : Atom<string> dropMetaData{Name("dropMetaData"), "NONE"};
127 : Atom<bool> writeParameterSets{Name("writeParameterSets"), true};
128 : fhicl::Table<ClosingCriteria::Config> fileProperties{
129 : Name("fileProperties"),
130 : Comment("The 'fileProperties' parameter is specified to enable "
131 : "output-file switching.")};
132 : fhicl::TableFragment<detail::SafeFileNameConfig> safeFileName;
133 0 : Atom<int> firstLoggerRank{Name("firstLoggerRank"), -1};
134 :
135 : struct NewSubStringForApp
136 : {
137 : fhicl::Atom<string> appName{fhicl::Name("appName")};
138 : fhicl::Atom<string> newString{fhicl::Name("newString")};
139 : };
140 : struct FileNameSubstitution
141 : {
142 : fhicl::Atom<string> targetString{fhicl::Name("targetString")};
143 : fhicl::Sequence<fhicl::Table<NewSubStringForApp>> replacementList{fhicl::Name("replacementList")};
144 : };
145 : fhicl::OptionalSequence<fhicl::Table<FileNameSubstitution>> fileNameSubstitutions{Name("fileNameSubstitutions")};
146 :
147 0 : Config()
148 0 : {
149 : // Both RootDAQOut module and OutputModule use the "fileName"
150 : // FHiCL parameter. However, whereas in OutputModule the
151 : // parameter has a default, for RootDAQOut the parameter should
152 : // not. We therefore have to change the default flag setting
153 : // for 'OutputModule::Config::fileName'.
154 : using namespace fhicl::detail;
155 : ParameterBase* adjustFilename{
156 0 : const_cast<fhicl::Atom<string>*>(&omConfig().fileName)}; // NOLINT(cppcoreguidelines-pro-type-const-cast)
157 0 : adjustFilename->set_par_style(fhicl::par_style::REQUIRED);
158 0 : }
159 :
160 : struct KeysToIgnore
161 : {
162 : set<string>
163 0 : operator()() const
164 : {
165 0 : set<string> keys{OutputModule::Config::KeysToIgnore::get()};
166 0 : keys.insert("results");
167 0 : return keys;
168 0 : }
169 : };
170 : };
171 :
172 : using Parameters = fhicl::WrappedTable<Config, Config::KeysToIgnore>;
173 :
174 : ~RootDAQOut();
175 : explicit RootDAQOut(Parameters const&);
176 : RootDAQOut(RootDAQOut const&) = delete;
177 : RootDAQOut(RootDAQOut&&) = delete;
178 : RootDAQOut& operator=(RootDAQOut const&) = delete;
179 : RootDAQOut& operator=(RootDAQOut&&) = delete;
180 :
181 : void postSelectProducts() override;
182 : void beginJob() override;
183 : void endJob() override;
184 : void beginRun(RunPrincipal const&) override;
185 : void endRun(RunPrincipal const&) override;
186 : void beginSubRun(SubRunPrincipal const&) override;
187 : void endSubRun(SubRunPrincipal const&) override;
188 : void event(EventPrincipal const&) override;
189 :
190 : private:
191 : // Replace OutputModule Functions.
192 : string fileNameAtOpen() const;
193 : string fileNameAtClose(string const& currentFileName);
194 : string const& lastClosedFileName() const override;
195 : Granularity fileGranularity() const override;
196 : void openFile(FileBlock const&) override;
197 : void respondToOpenInputFile(FileBlock const&) override;
198 : void readResults(ResultsPrincipal const& resp) override;
199 : void respondToCloseInputFile(FileBlock const&) override;
200 : void incrementInputFileNumber() override;
201 : void write(EventPrincipal&) override;
202 : void writeSubRun(SubRunPrincipal&) override;
203 : void writeRun(RunPrincipal&) override;
204 : void setSubRunAuxiliaryRangeSetID(RangeSet const&) override;
205 : void setRunAuxiliaryRangeSetID(RangeSet const&) override;
206 : bool isFileOpen() const override;
207 : void setFileStatus(OutputFileStatus) override;
208 : bool requestsToCloseFile() const override;
209 : void startEndFile() override;
210 : void writeFileFormatVersion() override;
211 : void writeFileIndex() override;
212 : void writeProcessConfigurationRegistry() override;
213 : void writeProcessHistoryRegistry() override;
214 : void writeParameterSetRegistry() override;
215 : void writeProductDescriptionRegistry() override;
216 : void writeParentageRegistry() override;
217 : void doWriteFileCatalogMetadata(
218 : FileCatalogMetadata::collection_type const& md,
219 : FileCatalogMetadata::collection_type const& ssmd) override;
220 : void writeProductDependencies() override;
221 : void finishEndFile() override;
222 : void doRegisterProducts(ProductDescriptions& productsToProduce,
223 : ModuleDescription const& md) override;
224 : std::string modifyFilePattern(std::string const& /*inputPattern*/, Config const& /*config*/);
225 :
226 : // Implementation Details.
227 : void doOpenFile();
228 :
229 : // Data Members.
230 : mutable std::recursive_mutex mutex_;
231 : string const catalog_;
232 : bool dropAllEvents_{false};
233 : bool dropAllSubRuns_;
234 : string const moduleLabel_;
235 : int inputFileCount_{};
236 : unique_ptr<RootDAQOutFile> rootOutputFile_{nullptr};
237 : FileStatsCollector fstats_;
238 : PostCloseFileRenamer fRenamer_;
239 : string const filePattern_;
240 : string tmpDir_;
241 : string lastClosedFileName_{};
242 : int const compressionLevel_;
243 : unsigned freePercent_;
244 : unsigned freeMB_;
245 : int64_t const saveMemoryObjectThreshold_;
246 : int64_t const treeMaxVirtualSize_;
247 : int const splitLevel_;
248 : int const basketSize_;
249 : DropMetaData dropMetaData_;
250 : bool dropMetaDataForDroppedData_;
251 : FastCloningEnabled fastCloningEnabled_{};
252 : // Set false only for cases where we are guaranteed never to need historical
253 : // ParameterSet information in the downstream file, such as when mixing.
254 : bool writeParameterSets_;
255 : ClosingCriteria fileProperties_;
256 : ProductDescriptions productsToProduce_{};
257 : ProductTables producedResultsProducts_{ProductTables::invalid()};
258 : RPManager rpm_;
259 : };
260 :
261 0 : RootDAQOut::~RootDAQOut() = default;
262 :
263 0 : RootDAQOut::RootDAQOut(Parameters const& config)
264 : : OutputModule{
265 0 : config().omConfig}
266 0 : , catalog_{config().catalog()}
267 0 : , dropAllSubRuns_{config().dropAllSubRuns()}
268 0 : , moduleLabel_{config.get_PSet().get<string>("module_label")}
269 0 : , fstats_{moduleLabel_, processName()}
270 0 : , fRenamer_{fstats_}
271 0 : , filePattern_{modifyFilePattern(config().omConfig().fileName(), config())}
272 0 : , tmpDir_{config().tmpDir() == default_tmpDir ? parent_path(filePattern_) : config().tmpDir()}
273 0 : , compressionLevel_{config().compressionLevel()}
274 0 : , freePercent_{config().freePercent()}
275 0 : , freeMB_{config().freeMB()}
276 0 : , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
277 0 : , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
278 0 : , splitLevel_{config().splitLevel()}
279 0 : , basketSize_{config().basketSize()}
280 0 : , dropMetaData_{config().dropMetaData()}
281 0 : , dropMetaDataForDroppedData_{config().dropMetaDataForDroppedData()}
282 0 : , writeParameterSets_{config().writeParameterSets()}
283 0 : , fileProperties_{config().fileProperties()}
284 0 : , rpm_{config.get_PSet()}
285 : {
286 0 : TLOG(TLVL_INFO) << "RootDAQOut_module (s124 version) CONSTRUCTOR Start";
287 0 : bool const check_filename = config.get_PSet().has_key("fileProperties") and
288 0 : config().safeFileName().checkFileName();
289 0 : detail::validateFileNamePattern(check_filename, filePattern_);
290 :
291 : // Setup the streamers and error handlers.
292 0 : root::setup();
293 :
294 0 : bool const dropAllEventsSet{config().dropAllEvents(dropAllEvents_)};
295 0 : dropAllEvents_ = detail::shouldDropEvents(
296 0 : dropAllEventsSet, dropAllEvents_, dropAllSubRuns_);
297 : // N.B. Any time file switching is enabled at a boundary other than
298 : // InputFile, fastCloningEnabled_ ***MUST*** be deactivated. This is
299 : // to ensure that the Event tree from the InputFile is not
300 : // accidentally cloned to the output file before the output
301 : // module has seen the events that are going to be processed.
302 0 : bool fastCloningEnabled{true};
303 0 : bool const fastCloningSet{config().fastCloning(fastCloningEnabled)};
304 0 : fastCloningEnabled_ = shouldFastClone(
305 0 : fastCloningSet, fastCloningEnabled, wantAllEvents(), fileProperties_);
306 :
307 0 : if (auto const n = Globals::instance()->nschedules(); n > 1)
308 : {
309 0 : std::ostringstream oss;
310 0 : oss << "More than one schedule (" << n << ") is being used.";
311 0 : fastCloningEnabled_.disable(oss.str());
312 0 : }
313 :
314 0 : if (!writeParameterSets_)
315 : {
316 0 : mf::LogWarning("PROVENANCE")
317 0 : << "Output module " << moduleLabel_
318 0 : << " has parameter writeParameterSets set to false.\n"
319 : << "Parameter set provenance will not be available in subsequent "
320 0 : "jobs.\n"
321 : << "Check your experiment's policy on this issue to avoid future "
322 0 : "problems\n"
323 0 : << "with analysis reproducibility.\n";
324 : }
325 0 : }
326 :
327 0 : void RootDAQOut::openFile(FileBlock const& fb)
328 : {
329 0 : std::lock_guard sentry{mutex_};
330 : // Note: The file block here refers to the currently open
331 : // input file, so we can find out about the available
332 : // products by looping over the branches of the input
333 : // file data trees.
334 0 : if (!isFileOpen())
335 : {
336 0 : doOpenFile();
337 0 : respondToOpenInputFile(fb);
338 : }
339 0 : }
340 :
341 0 : void RootDAQOut::postSelectProducts()
342 : {
343 0 : std::lock_guard sentry{mutex_};
344 0 : if (isFileOpen())
345 : {
346 0 : rootOutputFile_->selectProducts();
347 : }
348 0 : }
349 :
350 0 : void RootDAQOut::respondToOpenInputFile(FileBlock const& fb)
351 : {
352 0 : std::lock_guard sentry{mutex_};
353 0 : ++inputFileCount_;
354 0 : if (!isFileOpen())
355 : {
356 0 : return;
357 : }
358 0 : auto const* rfb = dynamic_cast<RootFileBlock const*>(&fb);
359 0 : auto fastCloneThisOne = fastCloningEnabled_;
360 0 : if (!rfb)
361 : {
362 0 : fastCloneThisOne.disable("Input source does not read art/ROOT files.");
363 : }
364 : else
365 : {
366 0 : fastCloneThisOne.merge(rfb->fastClonable());
367 : }
368 0 : rootOutputFile_->beginInputFile(rfb, std::move(fastCloneThisOne));
369 0 : fstats_.recordInputFile(fb.fileName());
370 0 : }
371 :
372 0 : void RootDAQOut::readResults(ResultsPrincipal const& resp)
373 : {
374 0 : std::lock_guard sentry{mutex_};
375 0 : rpm_.for_each_RPWorker(
376 0 : [&resp](RPWorker& w) { w.rp().doReadResults(resp); });
377 0 : }
378 :
379 0 : void RootDAQOut::respondToCloseInputFile(FileBlock const& fb)
380 : {
381 0 : std::lock_guard sentry{mutex_};
382 0 : if (isFileOpen())
383 : {
384 0 : rootOutputFile_->respondToCloseInputFile(fb);
385 : }
386 0 : }
387 :
388 0 : void RootDAQOut::write(EventPrincipal& ep)
389 : {
390 0 : std::lock_guard sentry{mutex_};
391 0 : if (dropAllEvents_)
392 : {
393 0 : return;
394 : }
395 0 : if (hasNewlyDroppedBranch()[InEvent])
396 : {
397 0 : ep.addToProcessHistory();
398 0 : ep.refreshProcessHistoryID();
399 : }
400 0 : rootOutputFile_->writeOne(ep);
401 0 : fstats_.recordEvent(ep.eventID());
402 0 : }
403 :
404 0 : void RootDAQOut::setSubRunAuxiliaryRangeSetID(RangeSet const& rs)
405 : {
406 0 : std::lock_guard sentry{mutex_};
407 0 : rootOutputFile_->setSubRunAuxiliaryRangeSetID(rs);
408 0 : }
409 :
410 0 : void RootDAQOut::writeSubRun(SubRunPrincipal& sr)
411 : {
412 0 : std::lock_guard sentry{mutex_};
413 0 : if (dropAllSubRuns_)
414 : {
415 0 : return;
416 : }
417 0 : if (hasNewlyDroppedBranch()[InSubRun])
418 : {
419 0 : sr.addToProcessHistory();
420 : }
421 0 : rootOutputFile_->writeSubRun(sr);
422 0 : fstats_.recordSubRun(sr.subRunID());
423 0 : }
424 :
425 0 : void RootDAQOut::setRunAuxiliaryRangeSetID(RangeSet const& rs)
426 : {
427 0 : std::lock_guard sentry{mutex_};
428 0 : rootOutputFile_->setRunAuxiliaryRangeSetID(rs);
429 0 : }
430 :
431 0 : void RootDAQOut::writeRun(RunPrincipal& rp)
432 : {
433 0 : std::lock_guard sentry{mutex_};
434 0 : if (hasNewlyDroppedBranch()[InRun])
435 : {
436 0 : rp.addToProcessHistory();
437 : }
438 0 : rootOutputFile_->writeRun(rp);
439 0 : fstats_.recordRun(rp.runID());
440 0 : }
441 :
442 0 : void RootDAQOut::startEndFile()
443 : {
444 0 : std::lock_guard sentry{mutex_};
445 : auto resp = make_unique<ResultsPrincipal>(
446 0 : ResultsAuxiliary{}, moduleDescription().processConfiguration(), nullptr);
447 0 : resp->createGroupsForProducedProducts(producedResultsProducts_);
448 0 : resp->enableLookupOfProducedProducts();
449 0 : if (!producedResultsProducts_.descriptions(InResults).empty() ||
450 0 : hasNewlyDroppedBranch()[InResults])
451 : {
452 0 : resp->addToProcessHistory();
453 : }
454 0 : rpm_.for_each_RPWorker(
455 0 : [&resp](RPWorker& w) { w.rp().doWriteResults(*resp); });
456 0 : rootOutputFile_->writeResults(*resp);
457 0 : }
458 :
459 0 : void RootDAQOut::writeFileFormatVersion()
460 : {
461 0 : std::lock_guard sentry{mutex_};
462 0 : rootOutputFile_->writeFileFormatVersion();
463 0 : }
464 :
465 0 : void RootDAQOut::writeFileIndex()
466 : {
467 0 : std::lock_guard sentry{mutex_};
468 0 : rootOutputFile_->writeFileIndex();
469 0 : }
470 :
471 0 : void RootDAQOut::writeProcessConfigurationRegistry()
472 : {
473 0 : std::lock_guard sentry{mutex_};
474 0 : rootOutputFile_->writeProcessConfigurationRegistry();
475 0 : }
476 :
477 0 : void RootDAQOut::writeProcessHistoryRegistry()
478 : {
479 0 : std::lock_guard sentry{mutex_};
480 0 : rootOutputFile_->writeProcessHistoryRegistry();
481 0 : }
482 :
483 0 : void RootDAQOut::writeParameterSetRegistry()
484 : {
485 0 : std::lock_guard sentry{mutex_};
486 0 : if (writeParameterSets_)
487 : {
488 0 : rootOutputFile_->writeParameterSetRegistry();
489 : }
490 0 : }
491 :
492 0 : void RootDAQOut::writeProductDescriptionRegistry()
493 : {
494 0 : std::lock_guard sentry{mutex_};
495 0 : rootOutputFile_->writeProductDescriptionRegistry();
496 0 : }
497 :
498 0 : void RootDAQOut::writeParentageRegistry()
499 : {
500 0 : std::lock_guard sentry{mutex_};
501 0 : rootOutputFile_->writeParentageRegistry();
502 0 : }
503 :
504 0 : void RootDAQOut::doWriteFileCatalogMetadata(
505 : FileCatalogMetadata::collection_type const& md,
506 : FileCatalogMetadata::collection_type const& ssmd)
507 : {
508 0 : std::lock_guard sentry{mutex_};
509 0 : rootOutputFile_->writeFileCatalogMetadata(fstats_, md, ssmd);
510 0 : }
511 :
512 0 : void RootDAQOut::writeProductDependencies()
513 : {
514 0 : std::lock_guard sentry{mutex_};
515 0 : rootOutputFile_->writeProductDependencies();
516 0 : }
517 :
518 0 : void RootDAQOut::finishEndFile()
519 : {
520 0 : std::lock_guard sentry{mutex_};
521 0 : string const currentFileName{rootOutputFile_->currentFileName()};
522 0 : rootOutputFile_->writeTTrees();
523 0 : rootOutputFile_.reset();
524 0 : fstats_.recordFileClose();
525 0 : lastClosedFileName_ = fileNameAtClose(currentFileName);
526 0 : TLOG(TLVL_INFO) << __func__ << ": Closed output file \"" << lastClosedFileName_ << "\"";
527 0 : rpm_.invoke(&ResultsProducer::doClear);
528 0 : }
529 :
530 0 : void RootDAQOut::doRegisterProducts(ProductDescriptions& producedProducts,
531 : ModuleDescription const& md)
532 : {
533 0 : std::lock_guard sentry{mutex_};
534 : // Register Results products from ResultsProducers.
535 0 : rpm_.for_each_RPWorker([&producedProducts, &md](RPWorker& w) {
536 0 : auto const& params = w.params();
537 0 : w.setModuleDescription(
538 0 : ModuleDescription{params.rpPSetID,
539 0 : params.rpPluginType,
540 0 : md.moduleLabel() + '#' + params.rpLabel,
541 : ModuleThreadingType::legacy,
542 : md.processConfiguration()});
543 0 : w.rp().registerProducts(producedProducts, w.moduleDescription());
544 0 : });
545 : // Form product table for Results products. We do this here so we
546 : // can appropriately set the product tables for the
547 : // ResultsPrincipal.
548 0 : productsToProduce_ = producedProducts;
549 0 : producedResultsProducts_ = ProductTables{productsToProduce_};
550 0 : }
551 :
552 0 : void RootDAQOut::setFileStatus(OutputFileStatus const ofs)
553 : {
554 0 : std::lock_guard sentry{mutex_};
555 0 : if (isFileOpen())
556 : {
557 0 : rootOutputFile_->setFileStatus(ofs);
558 : }
559 0 : }
560 :
561 0 : bool RootDAQOut::isFileOpen() const
562 : {
563 0 : std::lock_guard sentry{mutex_};
564 0 : return rootOutputFile_.get() != nullptr;
565 0 : }
566 :
567 0 : void RootDAQOut::incrementInputFileNumber()
568 : {
569 0 : std::lock_guard sentry{mutex_};
570 0 : if (isFileOpen())
571 : {
572 0 : rootOutputFile_->incrementInputFileNumber();
573 : }
574 0 : }
575 :
576 0 : bool RootDAQOut::requestsToCloseFile() const
577 : {
578 0 : std::lock_guard sentry{mutex_};
579 0 : return isFileOpen() ? rootOutputFile_->requestsToCloseFile() : false;
580 0 : }
581 :
582 : Granularity
583 0 : RootDAQOut::fileGranularity() const
584 : {
585 0 : std::lock_guard sentry{mutex_};
586 0 : return fileProperties_.granularity();
587 0 : }
588 :
589 0 : void RootDAQOut::doOpenFile()
590 : {
591 0 : std::lock_guard sentry{mutex_};
592 0 : if (inputFileCount_ == 0)
593 : {
594 0 : throw Exception(errors::LogicError) // NOLINT(cert-err60-cpp)
595 : << "Attempt to open output file before input file. "
596 0 : << "Please report this to the core framework developers.\n";
597 : }
598 0 : rootOutputFile_ = make_unique<RootDAQOutFile>(this,
599 0 : fileNameAtOpen(),
600 0 : fileProperties_,
601 0 : compressionLevel_,
602 0 : freePercent_,
603 0 : freeMB_,
604 0 : saveMemoryObjectThreshold_,
605 0 : treeMaxVirtualSize_,
606 0 : splitLevel_,
607 0 : basketSize_,
608 0 : dropMetaData_,
609 0 : dropMetaDataForDroppedData_);
610 0 : fstats_.recordFileOpen();
611 0 : TLOG(TLVL_INFO) << __func__ << ": Opened output file with pattern \"" << filePattern_ << "\"";
612 0 : }
613 :
614 : string
615 0 : RootDAQOut::fileNameAtOpen() const
616 : {
617 0 : return (filePattern_ == dev_null) ? dev_null : unique_filename(tmpDir_ + "/RootDAQOut");
618 : }
619 :
620 : string
621 0 : RootDAQOut::fileNameAtClose(std::string const& currentFileName)
622 : {
623 0 : return (filePattern_ == dev_null) ? dev_null : fRenamer_.maybeRenameFile(currentFileName, filePattern_);
624 : }
625 :
626 : string const&
627 0 : RootDAQOut::lastClosedFileName() const
628 : {
629 0 : std::lock_guard sentry{mutex_};
630 0 : if (lastClosedFileName_.empty())
631 : {
632 0 : throw Exception(errors::LogicError, "RootDAQOut::currentFileName(): ") // NOLINT(cert-err60-cpp)
633 0 : << "called before meaningful.\n";
634 : }
635 0 : return lastClosedFileName_;
636 0 : }
637 :
638 0 : void RootDAQOut::beginJob()
639 : {
640 0 : std::lock_guard sentry{mutex_};
641 0 : rpm_.invoke(&ResultsProducer::doBeginJob);
642 0 : }
643 :
644 0 : void RootDAQOut::endJob()
645 : {
646 0 : std::lock_guard sentry{mutex_};
647 0 : rpm_.invoke(&ResultsProducer::doEndJob);
648 0 : }
649 :
650 0 : void RootDAQOut::event(EventPrincipal const& ep)
651 : {
652 0 : std::lock_guard sentry{mutex_};
653 0 : rpm_.for_each_RPWorker([&ep](RPWorker& w) { w.rp().doEvent(ep); });
654 0 : }
655 :
656 0 : void RootDAQOut::beginSubRun(SubRunPrincipal const& srp)
657 : {
658 0 : std::lock_guard sentry{mutex_};
659 0 : rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doBeginSubRun(srp); });
660 0 : }
661 :
662 0 : void RootDAQOut::endSubRun(SubRunPrincipal const& srp)
663 : {
664 0 : std::lock_guard sentry{mutex_};
665 0 : rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doEndSubRun(srp); });
666 0 : }
667 :
668 0 : void RootDAQOut::beginRun(RunPrincipal const& rp)
669 : {
670 0 : std::lock_guard sentry{mutex_};
671 0 : rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doBeginRun(rp); });
672 0 : }
673 :
674 0 : void RootDAQOut::endRun(RunPrincipal const& rp)
675 : {
676 0 : std::lock_guard sentry{mutex_};
677 0 : rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doEndRun(rp); });
678 0 : }
679 :
680 : std::string
681 0 : RootDAQOut::modifyFilePattern(std::string const& inputPattern, Config const& config)
682 : {
683 : // Make sure that the shared memory is connected
684 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
685 :
686 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": inputPattern=\"" << inputPattern << "\"";
687 :
688 : // fetch the firstLoggerRank and fileNameSubstitutions (if provided) for use in
689 : // substituting keywords in the filename pattern
690 0 : int firstLoggerRank = config.firstLoggerRank();
691 0 : std::vector<Config::FileNameSubstitution> subs;
692 0 : config.fileNameSubstitutions(subs);
693 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": firstLoggerRank=" << firstLoggerRank
694 0 : << ", numberOfSubstitutionsProvided=" << subs.size();
695 :
696 : // initialization
697 0 : std::string modifiedPattern = inputPattern;
698 0 : std::string searchString;
699 : size_t targetLocation;
700 0 : int zeroBasedRelativeRank = my_rank;
701 0 : int oneBasedRelativeRank = my_rank + 1;
702 0 : if (firstLoggerRank >= 0)
703 : {
704 0 : zeroBasedRelativeRank -= firstLoggerRank;
705 0 : oneBasedRelativeRank -= firstLoggerRank;
706 : }
707 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": my_rank=" << my_rank << ", zeroBasedRelativeRank=" << zeroBasedRelativeRank
708 0 : << ", oneBasedRelativeRank=" << oneBasedRelativeRank;
709 :
710 : // if the "ZeroBasedRelativeRank" keyword was specified in the filename pattern,
711 : // perform the substitution
712 0 : searchString = "${ZeroBasedRelativeRank}";
713 0 : targetLocation = modifiedPattern.find(searchString);
714 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
715 0 : while (targetLocation != std::string::npos)
716 : {
717 0 : std::ostringstream oss;
718 0 : oss << zeroBasedRelativeRank;
719 0 : modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
720 0 : targetLocation = modifiedPattern.find(searchString);
721 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
722 0 : }
723 :
724 : // if the "OneBasedRelativeRank" keyword was specified in the filename pattern,
725 : // perform the substitution
726 0 : searchString = "${OneBasedRelativeRank}";
727 0 : targetLocation = modifiedPattern.find(searchString);
728 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
729 0 : while (targetLocation != std::string::npos)
730 : {
731 0 : std::ostringstream oss;
732 0 : oss << oneBasedRelativeRank;
733 0 : modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
734 0 : targetLocation = modifiedPattern.find(searchString);
735 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
736 0 : }
737 :
738 : // if the "Rank" keyword was specified in the filename pattern,
739 : // perform the substitution
740 0 : searchString = "${Rank}";
741 0 : targetLocation = modifiedPattern.find(searchString);
742 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
743 0 : while (targetLocation != std::string::npos)
744 : {
745 0 : std::ostringstream oss;
746 0 : oss << my_rank;
747 0 : modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
748 0 : targetLocation = modifiedPattern.find(searchString);
749 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
750 0 : }
751 :
752 : // if the "Rank" keyword was specified in the filename pattern,
753 : // perform the substitution
754 0 : searchString = "${app_name}";
755 0 : targetLocation = modifiedPattern.find(searchString);
756 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
757 0 : while (targetLocation != std::string::npos)
758 : {
759 0 : std::ostringstream oss;
760 0 : oss << artdaq::Globals::app_name_;
761 0 : modifiedPattern.replace(targetLocation, searchString.length(), oss.str());
762 0 : targetLocation = modifiedPattern.find(searchString);
763 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
764 0 : }
765 :
766 : // if one or more free-form substitutions were provided, we'll do them here
767 0 : for (auto& sub : subs)
768 : {
769 : // first look up the replacement string for this process's app_name
770 0 : const std::string BLAH = "none_provided";
771 0 : std::string newString = BLAH;
772 0 : std::vector<Config::NewSubStringForApp> replacementList = sub.replacementList();
773 0 : for (auto& rdx : replacementList)
774 : {
775 0 : if (rdx.appName() == artdaq::Globals::app_name_)
776 : {
777 0 : newString = rdx.newString();
778 0 : break;
779 : }
780 : }
781 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": app_name=" << artdaq::Globals::app_name_ << ", newString=" << newString;
782 0 : if (newString != BLAH)
783 : {
784 : // first, add the expected surrounding text, and search for that
785 0 : searchString = "${" + sub.targetString() + "}";
786 0 : targetLocation = modifiedPattern.find(searchString);
787 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
788 0 : while (targetLocation != std::string::npos)
789 : {
790 0 : modifiedPattern.replace(targetLocation, searchString.length(), newString);
791 0 : targetLocation = modifiedPattern.find(searchString);
792 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
793 : }
794 :
795 : // then, search for the provided string, verbatim, in case the user specified
796 : // the enclosing text in the configuration document
797 0 : searchString = sub.targetString();
798 0 : targetLocation = modifiedPattern.find(searchString);
799 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
800 0 : while (targetLocation != std::string::npos)
801 : {
802 0 : modifiedPattern.replace(targetLocation, searchString.length(), newString);
803 0 : targetLocation = modifiedPattern.find(searchString);
804 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ":" << __LINE__ << " searchString=" << searchString << ", targetLocation=" << targetLocation;
805 : }
806 : }
807 0 : }
808 :
809 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": modifiedPattern = \"" << modifiedPattern << "\"";
810 0 : return modifiedPattern;
811 0 : }
812 :
813 : } // namespace art
814 :
815 0 : DEFINE_ART_MODULE(art::RootDAQOut) // NOLINT(performance-unnecessary-value-param)
|