Line data Source code
1 : #include "artdaq/ArtModules/RootDAQOutput-s124/RootDAQOutFile.h"
2 : #include "TRACE/tracemf.h"
3 : // vim: set sw=2 expandtab :
4 :
5 : #include "artdaq/DAQdata/Globals.hh"
6 :
7 : #include "art/Framework/Core/OutputFileGranularity.h"
8 : #include "art/Framework/Core/OutputModule.h"
9 : #include "art/Framework/IO/ClosingCriteria.h"
10 : #include "art/Framework/IO/FileStatsCollector.h"
11 : #include "art/Framework/Principal/EventPrincipal.h"
12 : #include "art/Framework/Principal/RangeSetsSupported.h"
13 : #include "art/Framework/Principal/ResultsPrincipal.h"
14 : #include "art/Framework/Principal/RunPrincipal.h"
15 : #include "art/Framework/Principal/SubRunPrincipal.h"
16 : #include "art/Framework/Services/Registry/ServiceHandle.h"
17 : #include "art/Framework/Services/System/DatabaseConnection.h"
18 : #include "art/Persistency/Provenance/ProcessHistoryRegistry.h"
19 : #include "art_root_io/DropMetaData.h"
20 : #include "art_root_io/GetFileFormatEra.h"
21 : #include "art_root_io/GetFileFormatVersion.h"
22 : #include "art_root_io/RootDB/TKeyVFSOpenPolicy.h"
23 : #include "art_root_io/RootFileBlock.h"
24 : #include "art_root_io/checkDictionaries.h"
25 : #include "art_root_io/detail/getObjectRequireDict.h"
26 : #include "boost/date_time/posix_time/posix_time.hpp"
27 : #include "canvas/Persistency/Provenance/BranchChildren.h"
28 : #include "canvas/Persistency/Provenance/BranchType.h"
29 : #include "canvas/Persistency/Provenance/EventAuxiliary.h"
30 : #include "canvas/Persistency/Provenance/EventID.h"
31 : #include "canvas/Persistency/Provenance/FileFormatVersion.h"
32 : #include "canvas/Persistency/Provenance/Parentage.h"
33 : #include "canvas/Persistency/Provenance/ParentageRegistry.h"
34 : #include "canvas/Persistency/Provenance/ProductStatus.h"
35 : #include "canvas/Persistency/Provenance/ResultsAuxiliary.h"
36 : #include "canvas/Persistency/Provenance/RunAuxiliary.h"
37 : #include "canvas/Persistency/Provenance/SubRunAuxiliary.h"
38 : #include "canvas/Persistency/Provenance/rootNames.h"
39 : #include "canvas/Utilities/Exception.h"
40 : #include "canvas_root_io/Utilities/DictionaryChecker.h"
41 : #include "cetlib/canonical_string.h"
42 : #include "cetlib/container_algorithms.h"
43 : #include "cetlib/exempt_ptr.h"
44 : #include "cetlib/sqlite/Ntuple.h"
45 : #include "cetlib/sqlite/Transaction.h"
46 : #include "cetlib/sqlite/create_table.h"
47 : #include "cetlib/sqlite/exec.h"
48 : #include "cetlib/sqlite/insert.h"
49 : #include "fhiclcpp/ParameterSetRegistry.h"
50 : #include "messagefacility/MessageLogger/MessageLogger.h"
51 : #include "range/v3/view.hpp"
52 :
53 : #include "TBranch.h"
54 :
55 : #define TRACE_NAME (app_name + "_RootDAQOutFile").c_str()
56 :
57 : #include "TFile.h"
58 : #include "TTree.h"
59 :
60 : #include <fcntl.h> // posix_fadvise POSIX_FADV_DONTNEED
61 : #include <sys/sysinfo.h> // sysinfo(sysinfo*)
62 : #include <algorithm>
63 : #include <utility>
64 : #include <vector>
65 :
66 : using namespace cet;
67 : using namespace hep::concurrency;
68 :
69 : using art::BranchType;
70 : using art::RootDAQOutFile;
71 : using art::rootNames::metaBranchRootName;
72 :
73 : using std::map;
74 : using std::string;
75 : using std::vector;
76 :
77 : namespace {
78 :
79 0 : void create_table(sqlite3* const db,
80 : string const& name,
81 : vector<string> const& columns,
82 : string const& suffix = {})
83 : {
84 0 : if (columns.empty())
85 : {
86 0 : throw art::Exception(art::errors::LogicError) // NOLINT(cert-err60-cpp)
87 0 : << "Number of sqlite columns specified for table: " << name << '\n'
88 0 : << "is zero.\n";
89 : }
90 0 : string ddl = "DROP TABLE IF EXISTS " + name +
91 : "; "
92 0 : "CREATE TABLE " +
93 0 : name + "(" + columns.front();
94 0 : for_each(columns.begin() + 1, columns.end(), [&ddl](auto const& col) {
95 0 : ddl += "," + col;
96 0 : });
97 0 : ddl += ") ";
98 0 : ddl += suffix;
99 0 : ddl += ";";
100 0 : sqlite::exec(db, ddl);
101 0 : }
102 :
103 0 : void insert_eventRanges_row(sqlite3_stmt* stmt,
104 : art::SubRunNumber_t const sr,
105 : art::EventNumber_t const b,
106 : art::EventNumber_t const e)
107 : {
108 0 : sqlite3_bind_int64(stmt, 1, sr);
109 0 : sqlite3_bind_int64(stmt, 2, b);
110 0 : sqlite3_bind_int64(stmt, 3, e);
111 0 : sqlite3_step(stmt);
112 0 : sqlite3_reset(stmt);
113 0 : }
114 :
115 0 : void insert_rangeSets_eventSets_row(sqlite3_stmt* stmt,
116 : unsigned const rsid,
117 : unsigned const esid)
118 : {
119 0 : sqlite3_bind_int64(stmt, 1, rsid);
120 0 : sqlite3_bind_int64(stmt, 2, esid);
121 0 : sqlite3_step(stmt);
122 0 : sqlite3_reset(stmt);
123 0 : }
124 :
125 : unsigned
126 0 : getNewRangeSetID(sqlite3* db,
127 : art::BranchType const bt,
128 : art::RunNumber_t const r)
129 : {
130 0 : sqlite::insert_into(db, art::BranchTypeToString(bt) + "RangeSets")
131 0 : .values(r);
132 0 : return sqlite3_last_insert_rowid(db);
133 : }
134 :
135 : vector<unsigned>
136 0 : getExistingRangeSetIDs(sqlite3* db, art::RangeSet const& rs)
137 : {
138 : using namespace std;
139 0 : vector<unsigned> rangeSetIDs;
140 0 : cet::transform_all(rs, back_inserter(rangeSetIDs), [db](auto const& range) {
141 0 : sqlite::query_result<unsigned> r;
142 0 : r << sqlite::select("ROWID")
143 0 : .from(db, "EventRanges")
144 : .where("SubRun=" + to_string(range.subRun()) +
145 : " AND "
146 : "begin=" +
147 : to_string(range.begin()) +
148 : " AND "
149 : "end=" +
150 : to_string(range.end()));
151 0 : return unique_value(r);
152 0 : });
153 0 : return rangeSetIDs;
154 0 : }
155 :
156 0 : void insertIntoEventRanges(sqlite3* db, art::RangeSet const& rs)
157 : {
158 0 : sqlite::Transaction txn{db};
159 0 : sqlite3_stmt* stmt{nullptr};
160 : string const ddl{
161 : "INSERT INTO EventRanges(SubRun, begin, end) "
162 0 : "VALUES(?, ?, ?);"};
163 0 : sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
164 0 : for (auto const& range : rs)
165 : {
166 0 : insert_eventRanges_row(stmt, range.subRun(), range.begin(), range.end());
167 : }
168 0 : sqlite3_finalize(stmt);
169 0 : txn.commit();
170 0 : }
171 :
172 0 : void insertIntoJoinTable(sqlite3* db,
173 : art::BranchType const bt,
174 : unsigned const rsID,
175 : vector<unsigned> const& eventRangesIDs)
176 : {
177 0 : sqlite::Transaction txn{db};
178 0 : sqlite3_stmt* stmt{nullptr};
179 : string const ddl{
180 0 : "INSERT INTO " + art::BranchTypeToString(bt) +
181 0 : "RangeSets_EventRanges(RangeSetsID, EventRangesID) Values(?,?);"};
182 0 : sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
183 0 : cet::for_all(eventRangesIDs, [stmt, rsID](auto const eventRangeID) {
184 0 : insert_rangeSets_eventSets_row(stmt, rsID, eventRangeID);
185 0 : });
186 0 : sqlite3_finalize(stmt);
187 0 : txn.commit();
188 0 : }
189 :
190 0 : void maybeInvalidateRangeSet(BranchType const bt,
191 : art::RangeSet const& principalRS,
192 : art::RangeSet& productRS)
193 : {
194 0 : assert(principalRS.is_sorted());
195 0 : assert(productRS.is_sorted());
196 0 : if (!productRS.is_valid())
197 : {
198 0 : return;
199 : }
200 0 : if (bt == art::InRun && productRS.is_full_run())
201 : {
202 0 : return;
203 : }
204 0 : if (bt == art::InSubRun && productRS.is_full_subRun())
205 : {
206 0 : return;
207 : }
208 0 : if (productRS.ranges().empty())
209 : {
210 0 : return;
211 : }
212 0 : auto const r = productRS.run();
213 0 : auto const& productFront = productRS.ranges().front();
214 0 : if (!principalRS.contains(r, productFront.subRun(), productFront.begin()))
215 : {
216 0 : productRS = art::RangeSet::invalid();
217 : }
218 : }
219 :
220 : // The purpose of 'maybeInvalidateRangeSet' is to support the
221 : // following situation. Suppose process 1 creates three files with
222 : // one Run product each, all corresponding to the same Run. Let's
223 : // call the individual Run product instances in the three separate
224 : // files as A, B, and C. Now suppose that the three files serve as
225 : // inputs to process 2, where a concatenation is being performed AND
226 : // ALSO an output file switch. Process 2 results in two output
227 : // files, and now, in process 3, we concatenate the outputs from
228 : // process 2. The situation would look like this:
229 : //
230 : // Process 1: [A] [B] [C]
231 : // \ / \ /
232 : // Process 2: [A + B] [B + C]
233 : // \ / \ /
234 : // D=agg(A,B) | | E=agg(B,C)
235 : // \ /
236 : // Process 3: [D + E]
237 : //
238 : // Notice the complication in process 3: product 'B' will be
239 : // aggregated twice: once with A, and once with C. Whenever the
240 : // output from process 3 is read as input to another process, the
241 : // fetched product will be equivalent to A+2B+C.
242 : //
243 : // To avoid this situation, we compare the RangeSet of the product
244 : // with the RangeSet of the in-memory RunAuxiliary. If the
245 : // beginning of B's RangeSet is not contained within the auxiliary's
246 : // RangeSet, then a dummy product with an invalid RangeSet is
247 : // written to disk. Instead of the diagram above, we have:
248 : //
249 : // Process 1: [A] [B] [C]
250 : // \ / \ /
251 : // Process 2: [A + B] [x + C]
252 : // \ / \ /
253 : // D=agg(A,B) | | E=agg(x,C)=C
254 : // \ /
255 : // Process 3: [D + E]
256 : //
257 : // where 'x' represent a dummy product. Upon aggregating D and E,
258 : // we obtain the correctly formed A+B+C product.
259 : template<BranchType BT>
260 : art::RangeSet
261 0 : getRangeSet(art::OutputHandle const& oh,
262 : art::RangeSet const& principalRS,
263 : bool const producedInThisProcess)
264 : {
265 : if constexpr (!art::detail::range_sets_supported(BT))
266 : {
267 0 : return art::RangeSet::invalid();
268 : }
269 :
270 0 : auto rs = oh.isValid() ? oh.rangeOfValidity() : art::RangeSet::invalid();
271 : // Because a user can specify (e.g.):
272 : // r.put(std::move(myProd), art::runFragment(myRangeSet));
273 : // products that are produced in this process can have valid, yet
274 : // arbitrary RangeSets. We therefore never invalidate a RangeSet
275 : // that corresponds to a product produced in this process.
276 : //
277 : // It is possible for a user to specify a RangeSet which does not
278 : // correspond AT ALL to the in-memory auxiliary RangeSet. In that
279 : // case, users should not expect to be able to retrieve products
280 : // for which no corresponding events or sub-runs were processed.
281 0 : if (!producedInThisProcess)
282 : {
283 0 : maybeInvalidateRangeSet(BT, principalRS, rs);
284 : }
285 0 : return rs;
286 0 : }
287 :
288 : template<BranchType BT>
289 0 : void setProductRangeSetID(art::RangeSet const& rs,
290 : sqlite3* db,
291 : art::EDProduct* product,
292 : map<unsigned, unsigned>& checksumToIndexLookup)
293 : {
294 : if constexpr (!art::detail::range_sets_supported(BT))
295 : {
296 0 : return;
297 : }
298 :
299 0 : if (!rs.is_valid())
300 : { // Invalid range-sets not written to DB
301 0 : return;
302 : }
303 : // Set range sets for SubRun and Run products
304 0 : auto it = checksumToIndexLookup.find(rs.checksum());
305 0 : if (it != checksumToIndexLookup.cend())
306 : {
307 0 : product->setRangeSetID(it->second);
308 : }
309 : else
310 : {
311 0 : unsigned const rsID = getNewRangeSetID(db, BT, rs.run());
312 0 : product->setRangeSetID(rsID);
313 0 : checksumToIndexLookup.emplace(rs.checksum(), rsID);
314 0 : insertIntoEventRanges(db, rs);
315 0 : auto const& eventRangesIDs = getExistingRangeSetIDs(db, rs);
316 0 : insertIntoJoinTable(db, BT, rsID, eventRangesIDs);
317 0 : }
318 : }
319 :
320 : } // unnamed namespace
321 :
322 : namespace art {
323 :
324 0 : OutputItem::~OutputItem() = default;
325 :
326 0 : OutputItem::OutputItem(BranchDescription const& bd)
327 0 : : branchDescription{bd}, product{nullptr}
328 0 : {}
329 :
330 0 : RootDAQOutFile::RootDAQOutFile(OutputModule* om,
331 : string const& fileName,
332 : ClosingCriteria const& fileSwitchCriteria,
333 : int const compressionLevel,
334 : unsigned freePercent,
335 : unsigned freeMB,
336 : int64_t const saveMemoryObjectThreshold,
337 : int64_t const treeMaxVirtualSize,
338 : int const splitLevel,
339 : int const basketSize,
340 : DropMetaData dropMetaData,
341 0 : bool const dropMetaDataForDroppedData)
342 0 : : om_{om}
343 0 : , file_{fileName}
344 0 : , fileSwitchCriteria_{fileSwitchCriteria}
345 0 : , compressionLevel_{compressionLevel}
346 0 : , freePercent_{freePercent}
347 0 : , freeMB_{freeMB}
348 0 : , saveMemoryObjectThreshold_{saveMemoryObjectThreshold}
349 0 : , treeMaxVirtualSize_{treeMaxVirtualSize}
350 0 : , splitLevel_{splitLevel}
351 0 : , basketSize_{basketSize}
352 0 : , dropMetaData_{dropMetaData}
353 0 : , dropMetaDataForDroppedData_{dropMetaDataForDroppedData}
354 0 : , filePtr_{TFile::Open(file_.c_str(), "recreate", "", compressionLevel)}
355 : {
356 : using std::make_unique;
357 : // Don't split metadata tree or event description tree
358 0 : metaDataTree_ = RootOutputTree::makeTTree(
359 : filePtr_.get(), rootNames::metaDataTreeName(), 0);
360 0 : fileIndexTree_ = RootOutputTree::makeTTree(
361 : filePtr_.get(), rootNames::fileIndexTreeName(), 0);
362 0 : parentageTree_ = RootOutputTree::makeTTree(
363 : filePtr_.get(), rootNames::parentageTreeName(), 0);
364 0 : treePointers_[0] =
365 0 : make_unique<RootOutputTree>(filePtr_.get(),
366 0 : InEvent,
367 0 : pEventAux_,
368 0 : pEventProductProvenanceVector_,
369 : basketSize,
370 : splitLevel,
371 : treeMaxVirtualSize,
372 0 : saveMemoryObjectThreshold);
373 0 : treePointers_[1] =
374 0 : make_unique<RootOutputTree>(filePtr_.get(),
375 0 : InSubRun,
376 0 : pSubRunAux_,
377 0 : pSubRunProductProvenanceVector_,
378 : basketSize,
379 : splitLevel,
380 : treeMaxVirtualSize,
381 0 : saveMemoryObjectThreshold);
382 0 : treePointers_[2] = make_unique<RootOutputTree>(filePtr_.get(),
383 0 : InRun,
384 0 : pRunAux_,
385 0 : pRunProductProvenanceVector_,
386 : basketSize,
387 : splitLevel,
388 : treeMaxVirtualSize,
389 0 : saveMemoryObjectThreshold);
390 0 : treePointers_[3] =
391 0 : make_unique<RootOutputTree>(filePtr_.get(),
392 0 : InResults,
393 0 : pResultsAux_,
394 0 : pResultsProductProvenanceVector_,
395 : basketSize,
396 : splitLevel,
397 : treeMaxVirtualSize,
398 0 : saveMemoryObjectThreshold);
399 : #if ART_HEX_VERSION > 0x31400
400 0 : rootFileDB_ = ServiceHandle<DatabaseConnection>
401 : {
402 0 : } -> get<TKeyVFSOpenPolicy>("RootFileDB",
403 0 : filePtr_.get(),
404 0 : SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE);
405 : #else
406 : rootFileDB_.reset(ServiceHandle<DatabaseConnection> {
407 : } -> get<TKeyVFSOpenPolicy>("RootFileDB",
408 : filePtr_.get(),
409 : SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE));
410 : #endif
411 0 : beginTime_ = std::chrono::steady_clock::now();
412 : // Check that dictionaries for the auxiliaries exist
413 0 : root::DictionaryChecker checker;
414 0 : checker.checkDictionaries<EventAuxiliary>();
415 0 : checker.checkDictionaries<SubRunAuxiliary>();
416 0 : checker.checkDictionaries<RunAuxiliary>();
417 0 : checker.checkDictionaries<ResultsAuxiliary>();
418 0 : checker.reportMissingDictionaries();
419 :
420 0 : createDatabaseTables();
421 0 : TLOG(TLVL_DEBUG + 32) << "RootDAQOutFile ctor complete";
422 0 : }
423 :
424 0 : art::RootDAQOutFile::~RootDAQOutFile()
425 : {
426 : struct sysinfo info;
427 0 : int sts = sysinfo(&info);
428 0 : auto free_percent = static_cast<unsigned>(info.freeram * 100 / info.totalram);
429 0 : auto free_MB = static_cast<unsigned>(info.freeram * info.mem_unit >> 20); // round down (1024.9 => 1024 MB)
430 0 : TRACE(TLVL_DEBUG + 32, "~RootDAQOutFile free %%%u %.1fMB (%u) buffers=%fGB mem_unit=%u", // NOLINT
431 : free_percent, static_cast<float>(info.freeram * info.mem_unit / (1024 * 1024.0)),
432 : free_MB, static_cast<float>(info.bufferram * info.mem_unit / (1024 * 1024 * 1024.0)), info.mem_unit);
433 0 : if (free_percent < freePercent_ || free_MB < freeMB_)
434 : {
435 0 : TLOG(TLVL_DEBUG + 32) << "RootDAQOutFile Flush/DONTNEED";
436 0 : filePtr_->Flush();
437 0 : sts = posix_fadvise(filePtr_->GetFd(), 0, 0 /*len,0=all*/, POSIX_FADV_DONTNEED);
438 : }
439 0 : TLOG(TLVL_DEBUG + 32) << "~RootDAQOutFile complete sts=" << sts;
440 0 : }
441 :
442 0 : void art::RootDAQOutFile::createDatabaseTables()
443 : {
444 : // Event ranges
445 0 : create_table(*rootFileDB_,
446 : "EventRanges",
447 : {"SubRun INTEGER",
448 : "begin INTEGER",
449 : "end INTEGER",
450 : "UNIQUE (SubRun,begin,end) ON CONFLICT IGNORE"});
451 : // SubRun range sets
452 : using namespace cet::sqlite;
453 0 : create_table(*rootFileDB_, "SubRunRangeSets", column<int>{"Run"});
454 0 : create_table(*rootFileDB_,
455 : "SubRunRangeSets_EventRanges",
456 : {"RangeSetsID INTEGER",
457 : "EventRangesID INTEGER",
458 : "PRIMARY KEY(RangeSetsID,EventRangesID)"},
459 : "WITHOUT ROWID");
460 : // Run range sets
461 0 : create_table(*rootFileDB_, "RunRangeSets", column<int>{"Run"});
462 0 : create_table(*rootFileDB_,
463 : "RunRangeSets_EventRanges",
464 : {"RangeSetsID INTEGER",
465 : "EventRangesID INTEGER",
466 : "PRIMARY KEY(RangeSetsID,EventRangesID)"},
467 : "WITHOUT ROWID");
468 0 : }
469 :
470 0 : void RootDAQOutFile::setFileStatus(OutputFileStatus const ofs)
471 : {
472 0 : std::lock_guard sentry{mutex_};
473 0 : status_ = ofs;
474 0 : }
475 :
476 : string const&
477 0 : RootDAQOutFile::currentFileName() const
478 : {
479 0 : std::lock_guard sentry{mutex_};
480 0 : return file_;
481 0 : }
482 :
483 0 : void RootDAQOutFile::selectProducts()
484 : {
485 0 : std::lock_guard sentry{mutex_};
486 0 : auto selectProductsToWrite = [this](BranchType const bt) {
487 0 : auto& items = selectedOutputItemList_[bt];
488 0 : for (auto const& pd : om_->keptProducts()[bt] | ranges::views::values)
489 : {
490 : // Persist Results products only if they have been produced by
491 : // the current process.
492 0 : if (bt == InResults && !pd.produced())
493 : {
494 0 : continue;
495 : }
496 0 : checkDictionaries(pd);
497 : // Although the transient flag is already checked when
498 : // OutputModule::doSelectProducts is called, it can be flipped
499 : // to 'true' after the BranchDescription transients have been
500 : // fluffed, which happens during the checkDictionaries call.
501 0 : if (pd.transient())
502 : {
503 0 : continue;
504 : }
505 0 : items.try_emplace(pd.productID(), pd);
506 : }
507 0 : for (auto const& item : items | ranges::views::values)
508 : {
509 0 : treePointers_[bt]->addOutputBranch(item.branchDescription,
510 0 : item.product);
511 : }
512 0 : };
513 0 : for_each_branch_type(selectProductsToWrite);
514 0 : }
515 :
516 0 : void RootDAQOutFile::beginInputFile(RootFileBlock const* rfb,
517 : FastCloningEnabled fastCloningEnabled)
518 : {
519 0 : std::lock_guard sentry{mutex_};
520 :
521 : // Create output branches, and then redo calculation to determine
522 : // if fast cloning should be done.
523 0 : selectProducts();
524 :
525 0 : cet::exempt_ptr<TTree const> inputTree{nullptr};
526 0 : if (rfb)
527 : {
528 0 : if (rfb->fileFormatVersion().value_ < 10)
529 : {
530 0 : fastCloningEnabled.disable(
531 : "The input file has a different ProductID "
532 : "schema than the in-memory schema.");
533 : }
534 0 : inputTree = rfb->tree();
535 0 : if (inputTree)
536 : {
537 0 : if (!treePointers_[InEvent]->checkSplitLevelAndBasketSize(inputTree))
538 : {
539 0 : fastCloningEnabled.disable(
540 : "The splitting level and/or basket size does not match between "
541 : "input and output file.");
542 : }
543 0 : if (inputTree->GetCurrentFile()->GetVersion() < 60001)
544 : {
545 0 : fastCloningEnabled.disable(
546 : "The ROOT version used to write the input file (< 6.00/01)\nhas a "
547 : "different splitting policy.");
548 : }
549 : }
550 : }
551 :
552 0 : if (not fastCloningEnabled)
553 : {
554 0 : mf::LogInfo("FastCloning") << fastCloningEnabled.disabledBecause();
555 0 : return;
556 : }
557 :
558 0 : mf::LogInfo("FastCloning")
559 0 : << "Fast cloning event data products from input file.";
560 0 : wasFastCloned_ = treePointers_[InEvent]->fastCloneTree(inputTree);
561 0 : }
562 :
563 0 : void RootDAQOutFile::incrementInputFileNumber()
564 : {
565 0 : std::lock_guard sentry{mutex_};
566 0 : fp_.update_inputFile();
567 0 : }
568 :
569 0 : void RootDAQOutFile::respondToCloseInputFile(FileBlock const&)
570 : {
571 0 : std::lock_guard sentry{mutex_};
572 0 : cet::for_all(treePointers_, [](auto const& p) { p->setEntries(); });
573 0 : }
574 :
575 0 : bool RootDAQOutFile::requestsToCloseFile()
576 : {
577 0 : std::lock_guard sentry{mutex_};
578 : using namespace std::chrono;
579 0 : unsigned int constexpr oneK{1024u};
580 0 : fp_.updateSize(filePtr_->GetSize() / oneK);
581 0 : fp_.updateAge(duration_cast<seconds>(steady_clock::now() - beginTime_));
582 0 : return fileSwitchCriteria_.should_close(fp_);
583 0 : }
584 :
585 0 : void RootDAQOutFile::writeOne(EventPrincipal const& e)
586 : {
587 0 : std::lock_guard sentry{mutex_};
588 0 : TLOG(TLVL_DEBUG + 33) << "Start of RootDAQOutFile::writeOne";
589 : // Note: The pEventAux_ must be set before calling fillBranches
590 : // since it gets written out in that routine.
591 0 : pEventAux_ = &e.eventAux();
592 : // Because getting the data may cause an exception to be thrown we
593 : // want to do that first before writing anything to the file about
594 : // this event.
595 0 : fillBranches<InEvent>(e, pEventProductProvenanceVector_);
596 :
597 : // Add the dataType to the job report if it hasn't already been done
598 0 : if (!dataTypeReported_)
599 : {
600 0 : string dataType{"MC"};
601 0 : if (pEventAux_->isRealData())
602 : {
603 0 : dataType = "Data";
604 : }
605 0 : dataTypeReported_ = true;
606 0 : }
607 : // Add event to index
608 0 : fileIndex_.addEntry(pEventAux_->eventID(), fp_.eventEntryNumber());
609 0 : fp_.update_event();
610 0 : TLOG(TLVL_DEBUG + 33) << "End of RootDAQOutFile::writeOne";
611 0 : }
612 :
613 0 : void RootDAQOutFile::writeSubRun(SubRunPrincipal const& sr)
614 : {
615 0 : std::lock_guard sentry{mutex_};
616 0 : pSubRunAux_ = &sr.subRunAux();
617 0 : pSubRunAux_->setRangeSetID(subRunRSID_);
618 0 : fillBranches<InSubRun>(sr, pSubRunProductProvenanceVector_);
619 0 : fileIndex_.addEntry(EventID::invalidEvent(pSubRunAux_->subRunID()),
620 : fp_.subRunEntryNumber());
621 0 : fp_.update_subRun(status_);
622 0 : }
623 :
624 0 : void RootDAQOutFile::writeRun(RunPrincipal const& r)
625 : {
626 0 : std::lock_guard sentry{mutex_};
627 0 : pRunAux_ = &r.runAux();
628 0 : pRunAux_->setRangeSetID(runRSID_);
629 0 : fillBranches<InRun>(r, pRunProductProvenanceVector_);
630 0 : fileIndex_.addEntry(EventID::invalidEvent(pRunAux_->runID()),
631 : fp_.runEntryNumber());
632 0 : fp_.update_run(status_);
633 0 : }
634 :
635 0 : void RootDAQOutFile::writeParentageRegistry()
636 : {
637 0 : std::lock_guard sentry{mutex_};
638 0 : auto pid = root::getObjectRequireDict<ParentageID>();
639 0 : ParentageID const* hash = &pid;
640 0 : if (!parentageTree_->Branch(
641 0 : rootNames::parentageIDBranchName().c_str(), &hash, basketSize_, 0))
642 : {
643 0 : throw Exception(errors::FatalRootError) // NOLINT(cert-err60-cpp)
644 0 : << "Failed to create a branch for ParentageIDs in the output file";
645 : }
646 0 : hash = nullptr;
647 0 : auto par = root::getObjectRequireDict<Parentage>();
648 0 : Parentage const* desc = ∥
649 0 : if (!parentageTree_->Branch(
650 0 : rootNames::parentageBranchName().c_str(), &desc, basketSize_, 0))
651 : {
652 0 : throw Exception(errors::FatalRootError) // NOLINT(cert-err60-cpp)
653 0 : << "Failed to create a branch for Parentages in the output file";
654 : }
655 0 : desc = nullptr;
656 0 : for (auto const& pr : ParentageRegistry::get())
657 : {
658 0 : hash = &pr.first;
659 0 : desc = &pr.second;
660 0 : parentageTree_->Fill();
661 : }
662 0 : parentageTree_->SetBranchAddress(rootNames::parentageIDBranchName().c_str(),
663 : nullptr);
664 0 : parentageTree_->SetBranchAddress(rootNames::parentageBranchName().c_str(),
665 : nullptr);
666 0 : }
667 :
668 0 : void RootDAQOutFile::writeFileFormatVersion()
669 : {
670 0 : std::lock_guard sentry{mutex_};
671 0 : FileFormatVersion const ver{getFileFormatVersion(), getFileFormatEra()};
672 0 : auto const* pver = &ver;
673 0 : TBranch* b = metaDataTree_->Branch(
674 0 : metaBranchRootName<FileFormatVersion>(), &pver, basketSize_, 0);
675 : // FIXME: Turn this into a throw!
676 0 : assert(b);
677 0 : b->Fill();
678 0 : }
679 :
680 0 : void RootDAQOutFile::writeFileIndex()
681 : {
682 0 : std::lock_guard sentry{mutex_};
683 0 : fileIndex_.sortBy_Run_SubRun_Event();
684 0 : FileIndex::Element elem{};
685 0 : auto const* findexElemPtr = &elem;
686 0 : TBranch* b = fileIndexTree_->Branch(
687 0 : metaBranchRootName<FileIndex::Element>(), &findexElemPtr, basketSize_, 0);
688 : // FIXME: Turn this into a throw!
689 0 : assert(b);
690 0 : for (auto& entry : fileIndex_)
691 : {
692 0 : findexElemPtr = &entry;
693 0 : b->Fill();
694 : }
695 0 : b->SetAddress(0);
696 0 : }
697 :
698 0 : void RootDAQOutFile::writeProcessConfigurationRegistry()
699 : {
700 : // We don't do this yet; currently we're storing a slightly
701 : // bloated ProcessHistoryRegistry.
702 0 : }
703 :
704 0 : void RootDAQOutFile::writeProcessHistoryRegistry()
705 : {
706 0 : std::lock_guard sentry{mutex_};
707 0 : ProcessHistoryMap pHistMap;
708 0 : for (auto const& pr : ProcessHistoryRegistry::get())
709 : {
710 0 : pHistMap.emplace(pr);
711 : }
712 0 : auto const* p = &pHistMap;
713 0 : TBranch* b = metaDataTree_->Branch(
714 0 : metaBranchRootName<ProcessHistoryMap>(), &p, basketSize_, 0);
715 0 : if (b == nullptr)
716 : {
717 0 : throw Exception(errors::LogicError) // NOLINT(cert-err60-cpp)
718 : << "Unable to locate required "
719 : "ProcessHistoryMap branch in output "
720 0 : "metadata tree.\n";
721 : }
722 0 : b->Fill();
723 0 : }
724 :
725 0 : void RootDAQOutFile::writeFileCatalogMetadata(
726 : FileStatsCollector const& stats,
727 : FileCatalogMetadata::collection_type const& md,
728 : FileCatalogMetadata::collection_type const& ssmd)
729 : {
730 0 : std::lock_guard sentry{mutex_};
731 : using namespace cet::sqlite;
732 : Ntuple<string, string> fileCatalogMetadata{
733 0 : *rootFileDB_, "FileCatalog_metadata", {{"Name", "Value"}}, true};
734 0 : for (auto const& [key, value] : md)
735 : {
736 0 : fileCatalogMetadata.insert(key, value);
737 : }
738 :
739 : // Add our own specific information: File format and friends.
740 0 : fileCatalogMetadata.insert("file_format", "\"artroot\"");
741 :
742 : // File start time.
743 : namespace bpt = boost::posix_time;
744 0 : auto formatted_time = [](auto const& t) {
745 0 : return cet::canonical_string(bpt::to_iso_extended_string(t));
746 : };
747 0 : fileCatalogMetadata.insert("start_time",
748 0 : formatted_time(stats.outputFileOpenTime()));
749 : // File "end" time: now, since file is not actually closed yet.
750 0 : fileCatalogMetadata.insert(
751 : "end_time",
752 0 : formatted_time(boost::posix_time::second_clock::universal_time()));
753 : // Run/subRun information.
754 0 : if (!stats.seenSubRuns().empty())
755 : {
756 0 : auto I = find_if(md.crbegin(), md.crend(), [](auto const& p) {
757 0 : return p.first == "art.run_type";
758 0 : });
759 0 : if (I != md.crend())
760 : {
761 0 : std::ostringstream buf;
762 0 : buf << "[ ";
763 0 : for (auto const& srid : stats.seenSubRuns())
764 : {
765 0 : buf << "[ " << srid.run() << ", " << srid.subRun() << ", "
766 0 : << cet::canonical_string(I->second) << " ], ";
767 : }
768 : // Rewind over last delimiter.
769 0 : buf.seekp(-2, std::ios_base::cur);
770 0 : buf << " ]";
771 0 : fileCatalogMetadata.insert("runs", buf.str());
772 0 : }
773 : }
774 : // Number of events.
775 0 : fileCatalogMetadata.insert("event_count",
776 0 : std::to_string(stats.eventsThisFile()));
777 0 : fileCatalogMetadata.insert("first_event",
778 0 : std::to_string(stats.lowestEventID().event()));
779 0 : fileCatalogMetadata.insert("last_event",
780 0 : std::to_string(stats.highestEventID().event()));
781 : // File parents.
782 0 : if (!stats.parents().empty())
783 : {
784 0 : std::ostringstream pstring;
785 0 : pstring << "[ ";
786 0 : for (auto const& parent : stats.parents())
787 : {
788 0 : pstring << cet::canonical_string(parent) << ", ";
789 0 : }
790 : // Rewind over last delimiter.
791 0 : pstring.seekp(-2, std::ios_base::cur);
792 0 : pstring << " ]";
793 0 : fileCatalogMetadata.insert("parents", pstring.str());
794 0 : }
795 :
796 : // The following need to be encapsulated in an art table
797 : // first_event and last_event.
798 0 : auto eidToTuple = [](EventID const& eid) -> string {
799 0 : std::ostringstream eidStr;
800 0 : eidStr << "[ " << eid.run() << ", " << eid.subRun() << ", " << eid.event()
801 0 : << " ]";
802 0 : return eidStr.str();
803 0 : };
804 0 : fileCatalogMetadata.insert("art.first_event",
805 0 : eidToTuple(stats.lowestEventID()));
806 0 : fileCatalogMetadata.insert("art.last_event",
807 0 : eidToTuple(stats.highestEventID()));
808 0 : fileCatalogMetadata.insert("art.file_format_era",
809 0 : cet::canonical_string(getFileFormatEra()));
810 0 : fileCatalogMetadata.insert("art.file_format_version",
811 0 : std::to_string(getFileFormatVersion()));
812 :
813 : // Incoming stream-specific metadata overrides.
814 0 : for (auto const& [key, value] : ssmd)
815 : {
816 0 : fileCatalogMetadata.insert(key, value);
817 : }
818 0 : }
819 :
820 0 : void RootDAQOutFile::writeParameterSetRegistry()
821 : {
822 0 : std::lock_guard sentry{mutex_};
823 0 : fhicl::ParameterSetRegistry::exportTo(*rootFileDB_);
824 0 : }
825 :
826 0 : void RootDAQOutFile::writeProductDescriptionRegistry()
827 : {
828 0 : std::lock_guard sentry{mutex_};
829 : // Make a local copy of the UpdateOutputCallbacks's ProductList,
830 : // removing any transient or pruned products.
831 0 : ProductRegistry reg;
832 0 : auto productDescriptionsToWrite = [this, ®](BranchType const bt) {
833 0 : for (auto const& desc :
834 0 : descriptionsToPersist_[bt] | ranges::views::values)
835 : {
836 0 : reg.productList_.emplace(BranchKey{desc}, desc);
837 : }
838 0 : };
839 0 : for_each_branch_type(productDescriptionsToWrite);
840 0 : ProductRegistry const* regp = ®
841 0 : TBranch* b = metaDataTree_->Branch(
842 0 : metaBranchRootName<ProductRegistry>(), ®p, basketSize_, 0);
843 : // FIXME: Turn this into a throw!
844 0 : assert(b);
845 0 : b->Fill();
846 0 : }
847 :
848 0 : void RootDAQOutFile::writeProductDependencies()
849 : {
850 0 : std::lock_guard sentry{mutex_};
851 0 : BranchChildren const* ppDeps = &om_->branchChildren();
852 0 : TBranch* b = metaDataTree_->Branch(
853 0 : metaBranchRootName<BranchChildren>(), &ppDeps, basketSize_, 0);
854 : // FIXME: Turn this into a throw!
855 0 : assert(b);
856 0 : b->Fill();
857 0 : }
858 :
859 0 : void RootDAQOutFile::writeResults(ResultsPrincipal& resp)
860 : {
861 0 : std::lock_guard sentry{mutex_};
862 0 : pResultsAux_ = &resp.resultsAux();
863 0 : fillBranches<InResults>(resp, pResultsProductProvenanceVector_);
864 0 : }
865 :
866 0 : void RootDAQOutFile::writeTTrees()
867 : {
868 0 : TLOG(TLVL_DEBUG + 33) << "Start of RootDAQOutFile::writeTTrees";
869 0 : std::lock_guard sentry{mutex_};
870 0 : RootOutputTree::writeTTree(metaDataTree_);
871 0 : TLOG(TLVL_DEBUG + 33) << "RootDAQOutFile::writeTTrees after writing metaDataTree_";
872 0 : RootOutputTree::writeTTree(fileIndexTree_);
873 0 : TLOG(TLVL_DEBUG + 33) << "RootDAQOutFile::writeTTrees after writing fileIndexTree_";
874 0 : RootOutputTree::writeTTree(parentageTree_);
875 0 : TLOG(TLVL_DEBUG + 33) << "RootDAQOutFile::writeTTrees after writing parentageTree_";
876 0 : for_each_branch_type(
877 0 : [this](BranchType const bt) { treePointers_[bt]->writeTree(); });
878 0 : TLOG(TLVL_DEBUG + 33) << "End of RootDAQOutFile::writeTTrees";
879 0 : }
880 :
881 0 : void RootDAQOutFile::setSubRunAuxiliaryRangeSetID(RangeSet const& ranges)
882 : {
883 0 : std::lock_guard sentry{mutex_};
884 0 : subRunRSID_ = getNewRangeSetID(*rootFileDB_, InSubRun, ranges.run());
885 0 : insertIntoEventRanges(*rootFileDB_, ranges);
886 0 : auto const& eventRangesIDs = getExistingRangeSetIDs(*rootFileDB_, ranges);
887 0 : insertIntoJoinTable(*rootFileDB_, InSubRun, subRunRSID_, eventRangesIDs);
888 0 : }
889 :
890 0 : void RootDAQOutFile::setRunAuxiliaryRangeSetID(RangeSet const& ranges)
891 : {
892 0 : std::lock_guard sentry{mutex_};
893 0 : runRSID_ = getNewRangeSetID(*rootFileDB_, InRun, ranges.run());
894 0 : insertIntoEventRanges(*rootFileDB_, ranges);
895 0 : auto const& eventRangesIDs = getExistingRangeSetIDs(*rootFileDB_, ranges);
896 0 : insertIntoJoinTable(*rootFileDB_, InRun, runRSID_, eventRangesIDs);
897 0 : }
898 :
899 : template<BranchType BT>
900 : EDProduct const*
901 0 : RootDAQOutFile::getProduct(OutputHandle const& oh,
902 : RangeSet const& prunedProductRS,
903 : string const& wrappedName)
904 : {
905 0 : std::lock_guard sentry{mutex_};
906 : if constexpr (detail::range_sets_supported(BT))
907 : {
908 0 : if (!prunedProductRS.is_valid())
909 : {
910 0 : return dummyProductCache_.product(wrappedName);
911 : }
912 : }
913 0 : return oh.isValid() ? oh.wrapper() : dummyProductCache_.product(wrappedName);
914 0 : }
915 :
916 : template<BranchType BT>
917 0 : void RootDAQOutFile::fillBranches(Principal const& principal,
918 : vector<ProductProvenance>* vpp)
919 : {
920 0 : TLOG(TLVL_DEBUG + 33) << "Start of RootDAQOutFile::fillBranches";
921 0 : std::lock_guard sentry{mutex_};
922 0 : bool const fastCloning{BT == InEvent && wasFastCloned_};
923 0 : map<unsigned, unsigned> checksumToIndex;
924 0 : auto const& principalRS = principal.seenRanges();
925 :
926 : // Local variables to avoid many functions calls to
927 : // DropMetaData::operator==().
928 0 : bool const drop_no_metadata{dropMetaData_ == DropMetaData::DropNone};
929 0 : bool const drop_prior_metadata{dropMetaData_ == DropMetaData::DropPrior};
930 0 : bool const drop_all_metadata{dropMetaData_ == DropMetaData::DropAll};
931 :
932 0 : std::set<ProductProvenance> keptprv;
933 0 : for (auto const& [pid, val] : selectedOutputItemList_[BT])
934 : {
935 0 : auto const& bd = val.branchDescription;
936 0 : descriptionsToPersist_[BT].try_emplace(pid, bd);
937 0 : bool const produced = bd.produced();
938 0 : bool const resolveProd{produced || !fastCloning ||
939 0 : treePointers_[BT]->uncloned(bd.branchName())};
940 : // Update the kept provenance
941 0 : bool const keepProvenance =
942 0 : drop_no_metadata || (produced && drop_prior_metadata);
943 0 : auto const& oh = principal.getForOutput(pid, resolveProd);
944 0 : auto prov = keptprv.begin();
945 0 : if (keepProvenance)
946 : {
947 0 : if (oh.productProvenance())
948 : {
949 0 : prov = keptprv.insert(*oh.productProvenance()).first;
950 0 : if (!drop_all_metadata && !dropMetaDataForDroppedData_)
951 : {
952 : {
953 0 : vector<ProductProvenance const*> stacked_pp;
954 0 : stacked_pp.push_back(&*oh.productProvenance());
955 0 : while (not empty(stacked_pp))
956 : {
957 0 : auto current_pp = stacked_pp.back();
958 0 : stacked_pp.pop_back();
959 0 : for (auto const parent_bid :
960 0 : current_pp->parentage().parents())
961 : {
962 : // Note: Suppose the parent ProductID corresponds to
963 : // product that has been requested to be
964 : // "dropped"--i.e. someone has specified "drop
965 : // *_m1a_*_*" in their configuration, and
966 : // although a given product matching this
967 : // pattern will not be included in the
968 : // selectedProducts_ list, one of the parents of
969 : // a selected product can match the "dropping"
970 : // pattern and its BranchDescription will still
971 : // be written to disk since it is inserted into
972 : // the descriptionsToPersist_ data member.
973 0 : auto parent_bd = principal.getProductDescription(parent_bid);
974 0 : if (!parent_bd)
975 : {
976 : // FIXME: Is this an error condition?
977 0 : continue;
978 : }
979 0 : descriptionsToPersist_[BT].try_emplace(parent_bid,
980 0 : *parent_bd);
981 0 : if (!parent_bd->produced())
982 : {
983 : // We got it from the input, nothing to do.
984 0 : continue;
985 : }
986 : auto parent_pp =
987 0 : principal.branchToProductProvenance(parent_bid);
988 0 : if (!parent_pp || !drop_no_metadata)
989 : {
990 0 : continue;
991 : }
992 0 : if (!keptprv.insert(*parent_pp).second)
993 : {
994 : // Already there, done.
995 0 : continue;
996 : }
997 0 : if (!drop_all_metadata && !dropMetaDataForDroppedData_)
998 : {
999 0 : stacked_pp.push_back(parent_pp.get());
1000 : }
1001 : }
1002 : }
1003 0 : }
1004 : }
1005 : }
1006 : else
1007 : {
1008 : // No provenance: product was either not produced, or was
1009 : // dropped; create provenance to remember that.
1010 0 : auto status = productstatus::dropped();
1011 0 : if (produced)
1012 : {
1013 0 : status = productstatus::neverCreated();
1014 : }
1015 0 : prov = keptprv.emplace(pid, status).first;
1016 : }
1017 : }
1018 : // Resolve the product if we are going to attempt to write it out.
1019 0 : if (resolveProd)
1020 : {
1021 : // Product was either produced, or we are not cloning the whole
1022 : // file and the product branch was not cloned so we should be
1023 : // able to get a pointer to it from the passed principal and
1024 : // write it out.
1025 0 : auto const& rs = getRangeSet<BT>(oh, principalRS, produced);
1026 0 : if (detail::range_sets_supported(BT) && !rs.is_valid())
1027 : {
1028 : // At this point we are now going to write out a dummy product
1029 : // whose Wrapper present flag is false because the range set
1030 : // got invalidated to present double counting when combining
1031 : // run or subrun products from multiple fragments. We change
1032 : // the provenance status that we are going to write out to
1033 : // dummyToPreventDoubleCount to flag this case. Note that the
1034 : // requirement is only that the status not be
1035 : // productstatus::present(). We use a special code to make it
1036 : // easier for humans to tell what is going on.
1037 0 : auto prov_bid = prov->productID();
1038 0 : if (keptprv.erase(*prov) != 1ull)
1039 : {
1040 0 : throw Exception(errors::LogicError, "KeptProvenance::setStatus")
1041 0 : << "Attempt to set product status for product whose provenance "
1042 : "is not being recorded.\n";
1043 : }
1044 0 : prov =
1045 : keptprv
1046 0 : .emplace(prov_bid, productstatus::dummyToPreventDoubleCount())
1047 : .first;
1048 : }
1049 0 : auto const* product = getProduct<BT>(oh, rs, bd.wrappedName());
1050 0 : setProductRangeSetID<BT>(
1051 0 : rs, *rootFileDB_, const_cast<EDProduct*>(product), checksumToIndex);
1052 0 : val.product = product;
1053 0 : }
1054 : }
1055 0 : vpp->assign(keptprv.begin(), keptprv.end());
1056 0 : for (auto const& val : *vpp)
1057 : {
1058 0 : if (val.productStatus() == productstatus::uninitialized())
1059 : {
1060 0 : throw Exception(errors::LogicError,
1061 : "RootDAQOutFile::fillBranches(principal, vpp):")
1062 0 : << "Attempt to write a product with uninitialized provenance!\n";
1063 : }
1064 : }
1065 :
1066 0 : TLOG(TLVL_DEBUG + 33) << "RootDAQOutFile::fillBranches before fillTree call";
1067 0 : treePointers_[BT]->fillTree();
1068 0 : TLOG(TLVL_DEBUG + 33) << "RootDAQOutFile::fillBranches after fillTree call";
1069 0 : vpp->clear();
1070 0 : TLOG(TLVL_DEBUG + 33) << "End of RootDAQOutFile::fillBranches";
1071 0 : }
1072 :
1073 : } // namespace art
|