Line data Source code
1 : #ifndef ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
2 : #define ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 :
6 : #include "artdaq-core/Data/Fragment.hh"
7 : #include "artdaq/ArtModules/InputUtilities.hh"
8 :
9 : #include "artdaq-core/Data/MetadataFragment.hh"
10 : #include "artdaq-core/Data/detail/ParentageMap.hh"
11 : #include "artdaq-core/Utilities/TimeUtils.hh"
12 : #include "artdaq/ArtModules/ArtdaqFragmentNamingService.h"
13 : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
14 : #include "artdaq/DAQdata/Globals.hh"
15 : #include "artdaq/DAQdata/NetMonHeader.hh"
16 :
17 : #include "art/Framework/Core/FileBlock.h"
18 : #include "art/Framework/Core/ProductRegistryHelper.h"
19 : #include "art/Framework/IO/Sources/SourceHelper.h"
20 : #include "art/Framework/IO/Sources/put_product_in_principal.h"
21 : #include "art/Framework/Principal/EventPrincipal.h"
22 : #include "art/Framework/Principal/RunPrincipal.h"
23 : #include "art/Framework/Principal/SubRunPrincipal.h"
24 : #include "art/Framework/Services/Registry/ServiceHandle.h"
25 :
26 : #include "art/Persistency/Provenance/ProcessHistoryRegistry.h"
27 : #include "art_root_io/setup.h"
28 :
29 : #include "canvas/Persistency/Common/EDProduct.h"
30 : #include "canvas/Persistency/Provenance/BranchDescription.h"
31 : #include "canvas/Persistency/Provenance/BranchKey.h"
32 : #include "canvas/Persistency/Provenance/FileFormatVersion.h"
33 : #if ART_HEX_VERSION < 0x31100
34 : #include "canvas/Persistency/Provenance/History.h"
35 : #else
36 : #include "canvas/Persistency/Provenance/Compatibility/History.h"
37 : #endif
38 : #include "canvas/Persistency/Provenance/ParentageRegistry.h"
39 : #include "canvas/Persistency/Provenance/ProcessHistory.h"
40 : #include "canvas/Persistency/Provenance/ProcessHistoryID.h"
41 : #include "canvas/Persistency/Provenance/ProductList.h"
42 : #include "canvas/Persistency/Provenance/ProductProvenance.h"
43 :
44 : #include "fhiclcpp/ParameterSet.h"
45 : #include "fhiclcpp/ParameterSetID.h"
46 : #include "fhiclcpp/ParameterSetRegistry.h"
47 :
48 : #include <TBufferFile.h>
49 : #include <TClass.h>
50 : #include <TList.h>
51 : #include <TStreamerInfo.h>
52 :
53 : #include <sys/time.h>
54 : #include <chrono>
55 : #include <cstdio>
56 : #include <iomanip>
57 : #include <iostream>
58 : #include <list>
59 : #include <map>
60 : #include <memory>
61 : #include <set>
62 : #include <sstream>
63 : #include <string>
64 : #include <unordered_map>
65 : #include <utility>
66 : #include <vector>
67 :
68 : #define CAN_REINIT 0
69 :
70 : namespace art {
71 : template<typename U>
72 : class ArtdaqInputHelper;
73 : } // namespace art
74 :
75 : /**
76 : * \brief This template class provides a unified interface for reading data into art
77 : * \tparam U The class responsible for delivering data
78 : *
79 : * JCF, May-27-2016
80 : * ArtdaqInputHelper is a template class which takes, as a parameter, a
81 : * class which it uses to receive data; the instance of this class is
82 : * called "communicationWrapper_". As of this writing, this wrapper
83 : * class is implemented by NetMonWrapper (for reading data into the
84 : * aggregator from the eventbuilder) and TransferWrapper (for reading
85 : * data into an art process). This class presents a unified approach
86 : * to handling art provenance, regardless of the communication
87 : * protocol used to read data in.
88 : */
89 : template<typename U>
90 : class art::ArtdaqInputHelper
91 : {
92 : public:
93 : /**
94 : * \brief Copy Constructor is deleted
95 : */
96 : ArtdaqInputHelper(const ArtdaqInputHelper&) = delete;
97 :
98 : /**
99 : * \brief Copy Assignment operator is deleted
100 : * \return ArtdaqInputHelper copy
101 : */
102 : ArtdaqInputHelper& operator=(const ArtdaqInputHelper&) = delete;
103 :
104 : /**
105 : * \brief ArtdaqInputHelper Destructor
106 : */
107 : ~ArtdaqInputHelper();
108 :
109 : /**
110 : * \brief ArtdaqInputHelper Constructor
111 : * \param ps ParameterSet used to confiugre communication wrapper class
112 : * \param helper An art::ProductRegistryHelper for registering products
113 : * \param pm An art::SourceHelper for handling provenance
114 : */
115 : ArtdaqInputHelper(const fhicl::ParameterSet& ps, art::ProductRegistryHelper& helper, art::SourceHelper const& pm);
116 :
117 : /**
118 : * \brief Called by art to close the input source. No-Op
119 : */
120 : void closeCurrentFile();
121 :
122 : /**
123 : * \brief Emulate reading a file
124 : * \param fb Output art::FileBlock object
125 : */
126 : void readFile(const std::string&, art::FileBlock*& fb);
127 :
128 : /**
129 : * \brief Whether additional events are expected from the source
130 : * \return True if ArtdaqInputHelper has not been shut down
131 : */
132 : bool hasMoreData() const;
133 :
134 : /**
135 : * \brief Read the next event from the communication wrapper
136 : * \param inR RunPrincipal input pointer
137 : * \param inSR SubRunPrincipal input pointer
138 : * \param outR RunPrincipal output pointer
139 : * \param outSR SubRunPrincipal output pointer
140 : * \param outE EventPrincipal output pointer
141 : * \return Whether an event was successfully read from the communication wrapper
142 : */
143 : bool readNext(art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR, art::RunPrincipal*& outR,
144 : art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE);
145 :
146 : private:
147 : ArtdaqInputHelper(ArtdaqInputHelper&&) = delete;
148 : ArtdaqInputHelper& operator=(ArtdaqInputHelper&&) = delete;
149 :
150 : void readAndConstructPrincipal(std::unique_ptr<TBufferFile>&, artdaq::NetMonHeader::MessageType, art::RunPrincipal* const,
151 : art::SubRunPrincipal* const, art::RunPrincipal*&, art::SubRunPrincipal*&,
152 : art::EventPrincipal*&);
153 :
154 : bool constructPrincipal(std::shared_ptr<ArtdaqEvent>, art::RunPrincipal* const,
155 : art::SubRunPrincipal* const, art::RunPrincipal*&, art::SubRunPrincipal*&,
156 : art::EventPrincipal*&);
157 :
158 : template<class T>
159 : void readDataProducts(std::list<std::unique_ptr<TBufferFile>>&, T* const&);
160 :
161 : void putInPrincipal(RunPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
162 : std::unique_ptr<const ProductProvenance>&&);
163 :
164 : void putInPrincipal(SubRunPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
165 : std::unique_ptr<const ProductProvenance>&&);
166 :
167 : void putInPrincipal(EventPrincipal* const&, std::unique_ptr<EDProduct>&&, const BranchDescription&,
168 : std::unique_ptr<const ProductProvenance>&&);
169 :
170 : std::pair<bool, bool> readFragments(std::unordered_map<artdaq::Fragment::type_t, std::unique_ptr<artdaq::Fragments>> const& eventMap, art::RunPrincipal* const theRun, art::SubRunPrincipal* const theSubRun, art::EventPrincipal* const theEvent);
171 :
172 : void readInitMessage();
173 :
174 : bool shutdownMsgReceived_;
175 : art::SourceHelper const& pm_;
176 : art::ProductRegistryHelper& helper_;
177 : U communicationWrapper_;
178 : ProductList* productList_;
179 : std::unique_ptr<art::History> history_to_use_;
180 : bool fragmentsOnlyMode_;
181 : std::string pretend_module_name; ///< The module name to store data under
182 : size_t bytesRead; ///< running total of number of bytes received
183 : std::chrono::steady_clock::time_point last_read_time; ///< Time last read was completed
184 : };
185 :
186 : template<typename U>
187 0 : art::ArtdaqInputHelper<U>::ArtdaqInputHelper(const fhicl::ParameterSet& ps, art::ProductRegistryHelper& helper,
188 : art::SourceHelper const& pm)
189 0 : : shutdownMsgReceived_(false)
190 0 : , pm_(pm)
191 0 : , helper_(helper)
192 0 : , communicationWrapper_(ps)
193 0 : , productList_()
194 0 : , fragmentsOnlyMode_(false)
195 0 : , pretend_module_name(ps.get<std::string>("raw_data_label", "daq"))
196 0 : , bytesRead(0)
197 0 : , last_read_time(std::chrono::steady_clock::now())
198 : {
199 0 : root::setup();
200 : // Instantiate ArtdaqSharedMemoryService to set up artdaq Globals and MetricManager
201 0 : art::ServiceHandle<ArtdaqSharedMemoryServiceInterface> shm;
202 :
203 : #if 0
204 : volatile bool loop = true;
205 : while (loop)
206 : {
207 : usleep(1000);
208 : }
209 : #endif
210 :
211 : // JCF, May-27-2016
212 :
213 : // Something will have to be done about the labeling of this class,
214 : // since it's just a template class- the user will care about the
215 : // specific instantiation when it comes to messages
216 :
217 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::ArtdaqInputHelper("
218 0 : << "const fhicl::ParameterSet& ps, "
219 0 : << "art::ProductRegistryHelper& helper, "
220 0 : << "const art::SourceHelper& pm)";
221 0 : readInitMessage();
222 :
223 0 : helper.reconstitutes<artdaq::detail::RawEventHeader, art::InEvent>(pretend_module_name, "RawEventHeader");
224 :
225 0 : if (ps.get<bool>("register_fragment_types", true))
226 : {
227 0 : TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "Registering known Fragment labels from ArtdaqFragmentNamingServiceInterface";
228 :
229 0 : art::ServiceHandle<ArtdaqFragmentNamingServiceInterface> translator;
230 0 : helper.reconstitutes<artdaq::Fragments, art::InEvent>(pretend_module_name, translator->GetUnidentifiedInstanceName());
231 : // Workaround for #22979
232 0 : helper.reconstitutes<artdaq::Fragments, art::InRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
233 0 : helper.reconstitutes<artdaq::Fragments, art::InSubRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
234 :
235 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
236 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, translator->GetUnidentifiedInstanceName());
237 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, "StartOfRun");
238 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InRun>(pretend_module_name, "EndOfRun");
239 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, "StartOfSubrun");
240 0 : helper.reconstitutes<std::vector<artdaq::ArtdaqMetadata>, art::InSubRun>(pretend_module_name, "EndOfSubrun");
241 :
242 0 : std::set<std::string> instance_names = translator->GetAllProductInstanceNames();
243 0 : for (const auto& set_iter : instance_names)
244 : {
245 0 : helper.reconstitutes<artdaq::Fragments, art::InEvent>(pretend_module_name, set_iter);
246 : }
247 0 : }
248 : //
249 : // Finished with init message.
250 : //
251 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::ArtdaqInputHelper("
252 0 : << "const fhicl::ParameterSet& ps, "
253 0 : << "art::ProductRegistryHelper& helper, "
254 0 : << "const art::SourceHelper& pm)";
255 0 : }
256 :
257 : template<typename U>
258 0 : void art::ArtdaqInputHelper<U>::readInitMessage()
259 : {
260 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Going to receive init message";
261 0 : artdaq::FragmentPtrs initFrags = communicationWrapper_.receiveInitMessage();
262 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Init message received";
263 :
264 0 : if (!initFrags.empty() && initFrags.front().get()->type() == artdaq::Fragment::EndOfDataFragmentType)
265 : {
266 0 : TLOG_ERROR("ArtdaqInputHelper") << "Received EndOfData as first broadcast! This process neveer received any data!";
267 0 : shutdownMsgReceived_ = true;
268 : }
269 : else
270 : {
271 0 : if (initFrags.empty() || initFrags.back().get()->dataSize() == 0)
272 : {
273 0 : TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "No init message received or zero-size init message: Fragments-only mode activated! This is an EventBuilder!";
274 0 : fragmentsOnlyMode_ = true;
275 : }
276 : else
277 : {
278 0 : std::list<std::unique_ptr<TBufferFile>> msgs;
279 0 : for (auto& initFrag : initFrags)
280 : {
281 0 : auto header = initFrag->metadata<artdaq::NetMonHeader>();
282 0 : msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, initFrag->dataBegin(), kFALSE, nullptr));
283 : }
284 :
285 0 : std::set<art::ProcessHistoryID> history_ids;
286 :
287 0 : for (auto& msg : msgs)
288 : {
289 : // This first unsigned long is the message type code, ignored here in the constructor
290 0 : ULong_t dummy = 0;
291 0 : msg->ReadULong(dummy);
292 :
293 : // ELF: 6/11/2019: This code is taken from TSocket::RecvStreamerInfos
294 0 : auto list = dynamic_cast<TList*>(msg->ReadObject(TList::Class()));
295 :
296 0 : TIter next(list);
297 : TStreamerInfo* info;
298 0 : TObjLink* lnk = list->FirstLink();
299 : // First call BuildCheck for regular class
300 0 : while (lnk)
301 : {
302 0 : info = dynamic_cast<TStreamerInfo*>(lnk->GetObject());
303 0 : TObject* element = info->GetElements()->UncheckedAt(0);
304 0 : Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
305 0 : if (!isstl)
306 : {
307 0 : info->BuildCheck();
308 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: importing TStreamerInfo: " << info->GetName() << ", version = " << info->GetClassVersion();
309 : }
310 0 : lnk = lnk->Next();
311 : }
312 : // Then call BuildCheck for stl class
313 0 : lnk = list->FirstLink();
314 0 : while (lnk)
315 : {
316 0 : info = dynamic_cast<TStreamerInfo*>(lnk->GetObject());
317 0 : TObject* element = info->GetElements()->UncheckedAt(0);
318 0 : Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
319 0 : if (isstl)
320 : {
321 0 : info->BuildCheck();
322 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: importing TStreamerInfo: " << info->GetName() << ", version = " << info->GetClassVersion();
323 : }
324 0 : lnk = lnk->Next();
325 : }
326 : // ELF: 6/11/2019: End TSocket snippet
327 :
328 : //
329 : // Read the ParameterSetRegistry.
330 : //
331 0 : ULong_t ps_cnt = 0;
332 0 : msg->ReadULong(ps_cnt);
333 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: parameter set count: " << ps_cnt;
334 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: reading parameter sets ...";
335 0 : for (ULong_t I = 0; I < ps_cnt; ++I)
336 : {
337 0 : std::string pset_str = ""; // = ReadObjectAny<std::string>(msg, "std::string", "ArtdaqInputHelper::ArtdaqInputHelper");
338 0 : msg->ReadStdString(pset_str);
339 :
340 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: parameter set: " << pset_str;
341 :
342 0 : fhicl::ParameterSet pset;
343 0 : pset = fhicl::ParameterSet::make(pset_str);
344 : // Force id calculation.
345 0 : pset.id();
346 0 : fhicl::ParameterSetRegistry::put(pset);
347 : }
348 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: finished reading parameter sets.";
349 :
350 : //
351 : // Read the MasterProductRegistry.
352 : //
353 0 : auto thisProductList = ReadObjectAny<art::ProductList>(
354 : msg, "std::map<art::BranchKey,art::BranchDescription>", "ArtdaqInputHelper::ArtdaqInputHelper");
355 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Input Product list sz=" << thisProductList->size();
356 :
357 0 : bool productListInitialized = productList_ != nullptr;
358 0 : if (!productListInitialized) productList_ = thisProductList;
359 0 : for (auto I = thisProductList->begin(), E = thisProductList->end(); I != E; ++I)
360 : {
361 : #ifndef __OPTIMIZE__
362 0 : TLOG(TLVL_DEBUG + 50, "ArtdaqInputHelper") << "Branch key: class: '" << I->first.friendlyClassName_ << "' modlbl: '"
363 0 : << I->first.moduleLabel_ << "' instnm: '" << I->first.productInstanceName_ << "' procnm: '"
364 0 : << I->first.processName_ << "', branch description name: " << I->second.wrappedName();
365 : #endif
366 0 : if (productListInitialized)
367 : {
368 0 : productList_->emplace(*I);
369 : }
370 : }
371 :
372 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Reading ProcessHistory";
373 0 : auto phm = ReadObjectAny<art::ProcessHistoryMap>(
374 : msg, "std::map<const art::Hash<2>,art::ProcessHistory>", "ArtdaqInputHelper::ArtdaqInputHelper");
375 0 : printProcessMap(*phm, "ArtdaqInputHelper's ProcessHistoryMap");
376 :
377 0 : for (auto& proc_hist : *phm)
378 : {
379 0 : history_ids.insert(proc_hist.second.id());
380 : }
381 :
382 0 : ProcessHistoryRegistry::put(*phm);
383 0 : printProcessMap(ProcessHistoryRegistry::get(), "ArtdaqInputHelper's ProcessHistoryRegistry");
384 :
385 : //
386 : // Read the ParentageRegistry.
387 : //
388 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: Reading ParentageMap";
389 0 : auto parentageMap = ReadObjectAny<ParentageMap>(msg, "art::ParentageMap", "ArtdaqInputHelper::ArtdaqInputHelper");
390 0 : ParentageRegistry::put(*parentageMap);
391 : }
392 :
393 : // We're going to make a fake History using the collected process histories!
394 0 : art::ProcessHistory fake_process_history;
395 0 : for (auto& hist : history_ids)
396 : {
397 0 : if (!hist.isValid())
398 : {
399 0 : TLOG(TLVL_WARNING, "ArtdaqInputHelper") << "Encountered invalid history ID!";
400 0 : continue;
401 0 : }
402 :
403 0 : ProcessHistory thisProcessHistory;
404 0 : if (ProcessHistoryRegistry::get(hist, thisProcessHistory))
405 : {
406 0 : for (auto& conf : thisProcessHistory)
407 : {
408 0 : if (auto e = fake_process_history.end();
409 0 : std::find(fake_process_history.begin(), e, conf) == e)
410 : {
411 0 : fake_process_history.push_back(conf);
412 : }
413 : }
414 : }
415 : }
416 0 : art::ProcessHistoryMap fake_process_history_map;
417 0 : fake_process_history_map[fake_process_history.id()] = fake_process_history;
418 0 : ProcessHistoryRegistry::put(fake_process_history_map);
419 0 : printProcessMap(ProcessHistoryRegistry::get(), "ArtdaqInputHelper's ProcessHistoryRegistry w/fake history");
420 :
421 0 : history_to_use_.reset(new History());
422 0 : history_to_use_->setProcessHistoryID(fake_process_history.id());
423 :
424 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper")
425 0 : << "ArtdaqInputHelper: Product list sz=" << productList_->size();
426 : #if 0
427 : for (auto I = productList_->begin(), E = productList_->end(); I != E; ++I)
428 : {
429 : TLOG(TLVL_DEBUG + 50, "ArtdaqInputHelper") << "Branch key: class: '" << I->first.friendlyClassName_ << "' modlbl: '"
430 : << I->first.moduleLabel_ << "' instnm: '" << I->first.productInstanceName_ << "' procnm: '"
431 : << I->first.processName_ << "', branch description name: " << I->second.wrappedName();
432 : }
433 : #endif
434 : // helper now owns productList_!
435 :
436 0 : helper_.productList(std::unique_ptr<art::ProductList>(productList_));
437 :
438 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "ArtdaqInputHelper: got product list";
439 0 : }
440 : }
441 0 : }
442 :
443 : template<typename U>
444 0 : art::ArtdaqInputHelper<U>::~ArtdaqInputHelper()
445 0 : {}
446 :
447 : template<typename U>
448 0 : void art::ArtdaqInputHelper<U>::closeCurrentFile()
449 : {
450 0 : TLOG(TLVL_DEBUG + 34, "ArtdaqInputHelper") << "Begin/End: ArtdaqInputHelper::closeCurrentFile()";
451 0 : }
452 :
453 : template<typename U>
454 0 : void art::ArtdaqInputHelper<U>::readFile(const std::string&, art::FileBlock*& fb)
455 : {
456 0 : TLOG(TLVL_DEBUG + 35, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::"
457 : "readFile(const std::string& name, art::FileBlock*& fb)";
458 0 : fb = new art::FileBlock(art::FileFormatVersion(1, "ArtdaqInputHelper2013"), "");
459 0 : TLOG(TLVL_DEBUG + 35, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::"
460 : "readFile(const std::string& name, art::FileBlock*& fb)";
461 0 : }
462 :
463 : template<typename U>
464 0 : bool art::ArtdaqInputHelper<U>::hasMoreData() const
465 : {
466 0 : TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::hasMoreData()";
467 0 : if (shutdownMsgReceived_)
468 : {
469 0 : TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "ArtdaqInputHelper::hasMoreData(): "
470 : "returning false on shutdownMsgReceived_.";
471 0 : TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::hasMoreData()";
472 0 : return false;
473 : }
474 0 : TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "ArtdaqInputHelper::hasMoreData(): "
475 : "returning true on not shutdownMsgReceived_.";
476 0 : TLOG(TLVL_DEBUG + 36, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::hasMoreData()";
477 0 : return true;
478 : }
479 :
480 : template<typename U>
481 0 : void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBufferFile>& msg, artdaq::NetMonHeader::MessageType msg_type_code,
482 : art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR,
483 : art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR,
484 : art::EventPrincipal*& outE)
485 : {
486 : //
487 : // Process the message.
488 : //
489 0 : std::unique_ptr<art::RunAuxiliary> run_aux;
490 0 : std::unique_ptr<art::SubRunAuxiliary> subrun_aux;
491 0 : std::unique_ptr<art::EventAuxiliary> event_aux;
492 :
493 : // Establish default 'results'
494 0 : outR = nullptr;
495 0 : outSR = nullptr;
496 0 : outE = nullptr;
497 :
498 0 : art::Timestamp currentTime = 0;
499 : timespec hi_res_time;
500 0 : int retcode = clock_gettime(CLOCK_REALTIME, &hi_res_time);
501 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "hi_res_time tv_sec = " << hi_res_time.tv_sec
502 0 : << " tv_nsec = " << hi_res_time.tv_nsec << " (retcode = " << retcode << ")";
503 0 : if (retcode == 0)
504 : {
505 0 : currentTime = ((hi_res_time.tv_sec & 0xffffffff) << 32) | (hi_res_time.tv_nsec & 0xffffffff);
506 : }
507 : else
508 : {
509 0 : TLOG_ERROR("ArtdaqInputHelper")
510 0 : << "Unable to fetch a high-resolution time with clock_gettime for art::SubRun Timestamp. ";
511 : }
512 :
513 0 : TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "inR: " << static_cast<void*>(inR) << " run " << (inR ? std::to_string(inR->run()) : "invalid")
514 0 : << ", inSR: " << static_cast<void*>(inSR) << " run " << (inSR ? std::to_string(inSR->run()) : "invalid")
515 0 : << ", subrun " << (inSR ? std::to_string(inSR->subRun()) : "invalid");
516 :
517 : // Process Run Aux
518 0 : TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
519 0 : << "processing Run auxiliary ...";
520 :
521 0 : run_aux.reset(ReadObjectAny<art::RunAuxiliary>(msg, "art::RunAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
522 0 : run_aux->setProcessHistoryID(history_to_use_->processHistoryID());
523 0 : printProcessHistoryID("readAndConstructPrincipal", run_aux.get());
524 :
525 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
526 0 : << "inR: " << static_cast<void*>(inR) << " run/expected "
527 0 : << (inR ? std::to_string(inR->run()) : "invalid") << "/" << run_aux->run();
528 :
529 0 : if ((inR == nullptr) || !inR->runID().isValid() || (inR->run() != run_aux->run()))
530 : {
531 : // New run, either we have no input RunPrincipal, or the
532 : // input run number does not match the run number.
533 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: making RunPrincipal ...";
534 0 : outR = pm_.makeRunPrincipal(*run_aux);
535 : }
536 :
537 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
538 0 : << "finished processing Run auxiliary.";
539 :
540 0 : if (msg_type_code != artdaq::NetMonHeader::MessageType::Run) // SubRun or Event
541 : {
542 0 : TLOG(TLVL_DEBUG + 38, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
543 0 : << "processing SubRun auxiliary ...";
544 :
545 0 : subrun_aux.reset(
546 0 : ReadObjectAny<art::SubRunAuxiliary>(msg, "art::SubRunAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
547 0 : printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
548 :
549 : // HACK! Make the SR PHID match!
550 0 : printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
551 0 : subrun_aux->setProcessHistoryID(run_aux->processHistoryID());
552 0 : printProcessHistoryID("readAndConstructPrincipal", subrun_aux.get());
553 :
554 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
555 0 : << "inSR: " << static_cast<void*>(inSR) << " run/expected "
556 0 : << (inSR ? std::to_string(inSR->run()) : "invalid") << "/" << subrun_aux->run()
557 0 : << ", subrun/expected " << (inSR ? std::to_string(inSR->subRun()) : "invalid") << "/"
558 0 : << subrun_aux->subRun();
559 :
560 0 : art::SubRunID subrun_check(subrun_aux->run(), subrun_aux->subRun());
561 0 : if (inSR == nullptr || !inSR->subRunID().isValid() || subrun_check != inSR->subRunID())
562 : {
563 : // New SubRun, either we have no input SubRunPrincipal, or the
564 : // input subRun number does not match the subRun number.
565 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
566 0 : << "making SubRunPrincipal ...";
567 0 : outSR = pm_.makeSubRunPrincipal(*subrun_aux);
568 : }
569 :
570 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
571 0 : << "finished processing SubRun auxiliary.";
572 : }
573 :
574 0 : if (msg_type_code == artdaq::NetMonHeader::MessageType::Event)
575 : { // Event message.
576 :
577 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
578 0 : << "processing Event auxiliary ...";
579 :
580 0 : event_aux.reset(
581 0 : ReadObjectAny<art::EventAuxiliary>(msg, "art::EventAuxiliary", "ArtdaqInputHelper::readAndConstructPrincipal"));
582 :
583 0 : TLOG(TLVL_DEBUG + 34, "ArtdaqInputHelper") << "readAndConstructPrincipal: making EventPrincipal ...";
584 : #if ART_HEX_VERSION < 0x31100
585 : auto historyPtr = std::unique_ptr<art::History>(new History(*(history_to_use_.get())));
586 : if (!art::ProcessHistoryRegistry::get().count(history_to_use_->processHistoryID()))
587 : {
588 : TLOG_WARNING("ArtdaqInputHelper") << "Stored history is not in ProcessHistoryRegistry, this event may have issues!";
589 : }
590 : outE = pm_.makeEventPrincipal(*event_aux, std::move(historyPtr));
591 : #else
592 0 : if (!art::ProcessHistoryRegistry::get().count(history_to_use_->processHistoryID()))
593 : {
594 0 : TLOG_WARNING("ArtdaqInputHelper") << "Stored history is not in ProcessHistoryRegistry, this event may have issues!";
595 : }
596 0 : event_aux->setProcessHistoryID(history_to_use_->processHistoryID());
597 0 : outE = pm_.makeEventPrincipal(*event_aux);
598 : #endif
599 0 : TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
600 0 : << "finished processing Event auxiliary.";
601 : }
602 0 : else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
603 : {
604 0 : if (outSR == nullptr)
605 : {
606 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "SubrunDataFragment for current Subrun received, returning Flush event";
607 0 : art::EventID const evid(art::EventID::flushEvent(inSR->subRunID()));
608 0 : outE = pm_.makeEventPrincipal(evid, currentTime);
609 : }
610 : }
611 0 : else if (msg_type_code == artdaq::NetMonHeader::MessageType::Run)
612 : {
613 0 : if (outR == nullptr)
614 : {
615 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "RunDataFragment for current Run received, returning Flush subrun/event";
616 0 : art::EventID const evid(art::EventID::flushEvent(inR->runID()));
617 0 : outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
618 0 : outE = pm_.makeEventPrincipal(evid, currentTime);
619 : }
620 : }
621 0 : }
622 :
623 : template<typename U>
624 0 : bool art::ArtdaqInputHelper<U>::constructPrincipal(std::shared_ptr<ArtdaqEvent> eventPtr, art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR, art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE)
625 : {
626 : // We return false, indicating we're done reading, if:
627 : // 1) we did not obtain an event, because we timed out and were
628 : // configured NOT to keep trying after a timeout, or
629 : // 2) the event we read was the end-of-data marker: a null
630 : // pointer
631 0 : if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfDataFragmentType)
632 : {
633 0 : TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "Received shutdown message, returning false";
634 0 : shutdownMsgReceived_ = true;
635 0 : return false;
636 : }
637 :
638 0 : if (!eventPtr->header)
639 : {
640 0 : TLOG_ERROR("ArtdaqInputHelper") << "No RawEventHeader received, cannot construct principals!";
641 0 : shutdownMsgReceived_ = true;
642 0 : return false;
643 : }
644 :
645 : // Check the number of fragments in the RawEvent. If we have a single
646 : // fragment and that fragment is marked as EndRun or EndSubrun we'll create
647 : // the special principals for that.
648 0 : art::Timestamp currentTime = 0;
649 : timespec hi_res_time;
650 0 : int retcode = clock_gettime(CLOCK_REALTIME, &hi_res_time);
651 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "hi_res_time tv_sec = " << hi_res_time.tv_sec
652 0 : << " tv_nsec = " << hi_res_time.tv_nsec << " (retcode = " << retcode << ")";
653 0 : if (retcode == 0)
654 : {
655 0 : currentTime = ((hi_res_time.tv_sec & 0xffffffff) << 32) | (hi_res_time.tv_nsec & 0xffffffff);
656 : }
657 : else
658 : {
659 0 : TLOG_ERROR("ArtdaqInputHelper")
660 0 : << "Unable to fetch a high-resolution time with clock_gettime for art::Event Timestamp. "
661 0 : << "The art::Event Timestamp will be zero for event " << eventPtr->header->event_id;
662 : }
663 :
664 : // make new run if inR is 0 or if the run has changed
665 0 : if (inR == nullptr || !inR->runID().isValid() || inR->run() != eventPtr->header->run_id)
666 : {
667 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making run principal with run_id " << eventPtr->header->run_id;
668 0 : outR = pm_.makeRunPrincipal(eventPtr->header->run_id, currentTime);
669 : }
670 :
671 0 : if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfRunFragmentType)
672 : {
673 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfRunFragment received, returning Flush subrun/event";
674 0 : art::EventID const evid(art::EventID::flushEvent(outR != nullptr ? outR->runID() : inR->runID()));
675 0 : outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
676 0 : outE = pm_.makeEventPrincipal(evid, currentTime);
677 0 : return true;
678 : }
679 :
680 : // make new subrun if inSR is 0 or if the subrun has changed
681 0 : art::SubRunID subrun_check(eventPtr->header->run_id, eventPtr->header->subrun_id);
682 0 : if (inSR == nullptr || !inSR->subRunID().isValid() || subrun_check != inSR->subRunID())
683 : {
684 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making subrun principal with subrun_id " << eventPtr->header->subrun_id;
685 0 : outSR = pm_.makeSubRunPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, currentTime);
686 : }
687 :
688 0 : if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfSubrunFragmentType)
689 : {
690 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfSubrunFragment received, returning Flush event";
691 0 : art::EventID const evid(art::EventID::flushEvent(outSR != nullptr ? outSR->subRunID() : inSR->subRunID()));
692 0 : outE = pm_.makeEventPrincipal(evid, currentTime);
693 0 : return true;
694 : }
695 :
696 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making event principal with event_id " << eventPtr->header->event_id;
697 0 : outE = pm_.makeEventPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, eventPtr->header->event_id, currentTime);
698 0 : return true;
699 : }
700 :
701 : template<typename U>
702 : template<class T>
703 0 : void art::ArtdaqInputHelper<U>::readDataProducts(std::list<std::unique_ptr<TBufferFile>>& msgs, T* const& outPrincipal)
704 : {
705 0 : for (auto& msg : msgs)
706 : {
707 0 : ULong_t prd_cnt = 0;
708 : {
709 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: reading data product count ...";
710 0 : msg->ReadULong(prd_cnt);
711 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: product count: " << prd_cnt;
712 : }
713 : //
714 : // Read the data products.
715 : //
716 0 : for (ULong_t I = 0; I < prd_cnt; ++I)
717 : {
718 0 : std::unique_ptr<BranchKey> bk;
719 : {
720 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading branch key.";
721 0 : bk.reset(ReadObjectAny<BranchKey>(msg, "art::BranchKey", "ArtdaqInputHelper::readDataProducts"));
722 : }
723 :
724 : #ifndef __OPTIMIZE__
725 0 : TLOG(TLVL_DEBUG + 41, "ArtdaqInputHelper") << "readDataProducts: got product class: '" << bk->friendlyClassName_ << "' modlbl: '"
726 0 : << bk->moduleLabel_ << "' instnm: '" << bk->productInstanceName_ << "' procnm: '"
727 0 : << bk->processName_;
728 : #endif
729 0 : ProductList::const_iterator iter;
730 : {
731 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: looking up product ...";
732 0 : iter = productList_->find(*bk);
733 0 : if (iter == productList_->end())
734 : {
735 0 : throw art::Exception(art::errors::ProductNotFound) // NOLINT(cert-err60-cpp)
736 : << "No product is registered for\n"
737 0 : << " process name: '" << bk->processName_ << "'\n"
738 0 : << " module label: '" << bk->moduleLabel_ << "'\n"
739 0 : << " product friendly class name: '" << bk->friendlyClassName_ << "'\n"
740 0 : << " product instance name: '" << bk->productInstanceName_ << "'\n";
741 : }
742 : }
743 : // Note: This must be a reference to the unique copy in
744 : // the master product registry!
745 0 : const BranchDescription& bd = iter->second;
746 0 : std::unique_ptr<EDProduct> prd;
747 : {
748 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading product with wrapped name: " << bd.wrappedName()
749 0 : << ", TClass = " << static_cast<void*>(TClass::GetClass(bd.wrappedName().c_str()));
750 :
751 : // JCF, May-25-2016
752 : // Currently unclear why the templatized version of ReadObjectAny doesn't work here...
753 :
754 : // prd.reset(ReadObjectAny<EDProduct>(msg, bd.wrappedName()));
755 :
756 0 : void* p = msg->ReadObjectAny(TClass::GetClass(bd.wrappedName().c_str()));
757 0 : auto pp = reinterpret_cast<EDProduct*>(p); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
758 :
759 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: After ReadObjectAny(prd): p=" << p << ", EDProduct::isPresent: " << pp->isPresent();
760 0 : prd.reset(pp);
761 0 : p = nullptr;
762 : }
763 0 : std::unique_ptr<const ProductProvenance> prdprov;
764 : {
765 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: Reading product provenance.";
766 0 : prdprov.reset(ReadObjectAny<ProductProvenance>(msg, "art::ProductProvenance", "ArtdaqInputHelper::readDataProducts"));
767 : }
768 :
769 : {
770 0 : TLOG(TLVL_DEBUG + 40, "ArtdaqInputHelper") << "readDataProducts: inserting product: class: '" << bd.friendlyClassName()
771 0 : << "' modlbl: '" << bd.moduleLabel() << "' instnm: '" << bd.productInstanceName()
772 0 : << "' procnm: '" << bd.processName() << "' id: '" << bd.productID() << "'";
773 0 : putInPrincipal(outPrincipal, std::move(prd), bd, std::move(prdprov));
774 : }
775 : }
776 : }
777 0 : }
778 :
779 : template<typename U>
780 0 : void art::ArtdaqInputHelper<U>::putInPrincipal(RunPrincipal* const& rp, std::unique_ptr<EDProduct>&& prd,
781 : const BranchDescription& bd,
782 : std::unique_ptr<const ProductProvenance>&& prdprov)
783 : {
784 0 : rp->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::forRun(rp->runID())));
785 0 : }
786 :
787 : template<typename U>
788 0 : void art::ArtdaqInputHelper<U>::putInPrincipal(SubRunPrincipal* const& srp, std::unique_ptr<EDProduct>&& prd,
789 : const BranchDescription& bd,
790 : std::unique_ptr<const ProductProvenance>&& prdprov)
791 : {
792 0 : srp->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::forSubRun(srp->subRunID())));
793 0 : }
794 :
795 : template<typename U>
796 0 : void art::ArtdaqInputHelper<U>::putInPrincipal(EventPrincipal* const& ep, std::unique_ptr<EDProduct>&& prd,
797 : const BranchDescription& bd,
798 : std::unique_ptr<const ProductProvenance>&& prdprov)
799 : {
800 0 : TLOG(TLVL_DEBUG + 42, "ArtdaqInputHelper") << "EventPrincipal size before put: " << ep->size();
801 :
802 0 : ep->put(bd, std::move(prdprov), std::move(prd), std::make_unique<RangeSet>(RangeSet::invalid()));
803 :
804 0 : TLOG(TLVL_DEBUG + 42, "ArtdaqInputHelper") << "EventPrincipal size after put: " << ep->size();
805 0 : }
806 :
807 : template<typename U>
808 0 : std::pair<bool, bool> art::ArtdaqInputHelper<U>::readFragments(std::unordered_map<artdaq::Fragment::type_t, std::unique_ptr<artdaq::Fragments>> const& eventMap, art::RunPrincipal* const theRun, art::SubRunPrincipal* const theSubRun, art::EventPrincipal* const theEvent)
809 : {
810 : // Now read in Fragments
811 0 : double fragmentLatency = 0;
812 0 : double fragmentLatencyMax = 0.0;
813 0 : size_t fragmentCount = 0;
814 :
815 0 : bool eventProductsRead = false;
816 0 : bool subrunProductsRead = false;
817 :
818 0 : art::ServiceHandle<ArtdaqFragmentNamingServiceInterface> translator;
819 :
820 : // insert the Fragments of each type into the EventPrincipal
821 0 : for (auto& fragmentTypePair : eventMap)
822 : {
823 0 : auto type_code = fragmentTypePair.first;
824 0 : if (artdaq::Fragment::isSystemFragmentType(type_code) && type_code != artdaq::Fragment::ContainerFragmentType && type_code != artdaq::Fragment::EmptyFragmentType)
825 : {
826 0 : if (type_code == artdaq::Fragment::EndOfRunFragmentType)
827 : {
828 0 : std::unordered_map<std::string, std::unique_ptr<std::vector<artdaq::ArtdaqMetadata>>> metadata_coll;
829 0 : for (auto& frag : *fragmentTypePair.second)
830 : {
831 0 : artdaq::MetadataFragment mf(frag);
832 0 : auto md = mf.get_metadata();
833 :
834 0 : std::pair<bool, std::string> instance_name_result =
835 : translator->GetInstanceNameForFragment(frag);
836 0 : std::string label = instance_name_result.second;
837 0 : if (!instance_name_result.first)
838 : {
839 0 : TLOG_WARNING("ArtdaqInputHelper")
840 0 : << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
841 0 : << "\" is not known. Fragments of this "
842 0 : << "type will be stored in the event with an instance name of \"" << label << "\".";
843 : }
844 0 : if (!metadata_coll.count(label))
845 : {
846 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output ArtdaqMetadata storage for label " << label;
847 0 : metadata_coll[label] = std::make_unique<std::vector<artdaq::ArtdaqMetadata>>();
848 : }
849 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << metadata_coll[label]->size() + 1 << ")";
850 0 : metadata_coll[label]->push_back(md);
851 : }
852 0 : for (auto& type : metadata_coll)
853 : {
854 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " ArtdaqMetadatas with label " << type.first << " to Run.";
855 0 : put_product_in_principal(std::move(type.second), *theRun, pretend_module_name, type.first);
856 : }
857 0 : }
858 0 : else if (type_code == artdaq::Fragment::EndOfSubrunFragmentType)
859 : {
860 0 : std::unordered_map<std::string, std::unique_ptr<std::vector<artdaq::ArtdaqMetadata>>> metadata_coll;
861 0 : for (auto& frag : *fragmentTypePair.second)
862 : {
863 0 : artdaq::MetadataFragment mf(frag);
864 0 : auto md = mf.get_metadata();
865 :
866 0 : std::pair<bool, std::string> instance_name_result =
867 : translator->GetInstanceNameForFragment(frag);
868 0 : std::string label = instance_name_result.second;
869 0 : if (!instance_name_result.first)
870 : {
871 0 : TLOG_WARNING("ArtdaqInputHelper")
872 0 : << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
873 0 : << "\" is not known. Fragments of this "
874 0 : << "type will be stored in the event with an instance name of \"" << label << "\".";
875 : }
876 0 : if (!metadata_coll.count(label))
877 : {
878 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output ArtdaqMetadata storage for label " << label;
879 0 : metadata_coll[label] = std::make_unique<std::vector<artdaq::ArtdaqMetadata>>();
880 : }
881 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << metadata_coll[label]->size() + 1 << ")";
882 0 : metadata_coll[label]->push_back(md);
883 : }
884 0 : for (auto& type : metadata_coll)
885 : {
886 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " ArtdaqMetadatas with label " << type.first << " to SubRun.";
887 0 : put_product_in_principal(std::move(type.second), *theSubRun, pretend_module_name, type.first);
888 0 : subrunProductsRead = true;
889 : }
890 0 : }
891 : else
892 : {
893 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "Skipping system Fragment with type " << static_cast<int>(type_code) << " ( " << translator->GetInstanceNameForType(type_code) << " )";
894 : }
895 0 : continue;
896 0 : }
897 0 : TLOG(TLVL_DEBUG + 33, "ArtdaqInputHelper") << "type is " << static_cast<int>(type_code) << ", number of fragments is " << fragmentTypePair.second->size();
898 :
899 0 : std::unordered_map<std::string, std::unique_ptr<artdaq::Fragments>> derived_fragments;
900 0 : for (auto& frag : *fragmentTypePair.second)
901 : {
902 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Processing Fragment with ID " << frag.fragmentID();
903 0 : bytesRead += frag.sizeBytes();
904 0 : auto latency_s = frag.getLatency(true);
905 0 : double latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
906 :
907 0 : fragmentLatency += latency;
908 0 : fragmentCount++;
909 0 : if (latency > fragmentLatencyMax) fragmentLatencyMax = latency;
910 :
911 0 : std::pair<bool, std::string> instance_name_result =
912 : translator->GetInstanceNameForFragment(frag);
913 0 : std::string label = instance_name_result.second;
914 0 : if (!instance_name_result.first)
915 : {
916 0 : TLOG_WARNING("ArtdaqInputHelper")
917 0 : << "UnknownFragmentType: The product instance name mapping for fragment type \"" << static_cast<int>(type_code)
918 0 : << "\" is not known. Fragments of this "
919 0 : << "type will be stored in the event with an instance name of \"" << label << "\".";
920 : }
921 0 : if (!derived_fragments.count(label))
922 : {
923 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Creating output Fragment storage for label " << label;
924 0 : derived_fragments[label] = std::make_unique<artdaq::Fragments>();
925 : }
926 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding Fragment " << frag.fragmentID() << " to storage with label " << label << " (sz=" << derived_fragments[label]->size() + 1 << ")";
927 0 : derived_fragments[label]->emplace_back(std::move(frag));
928 : }
929 0 : for (auto& type : derived_fragments)
930 : {
931 0 : TLOG(TLVL_DEBUG + 44, "ArtdaqInputHelper") << "Adding " << type.second->size() << " Fragments with label " << type.first << " to event.";
932 0 : put_product_in_principal(std::move(type.second), *theEvent, pretend_module_name, type.first);
933 0 : eventProductsRead = true;
934 : }
935 : }
936 :
937 0 : if (metricMan)
938 : {
939 0 : metricMan->sendMetric("bytesRead", bytesRead, "B", 3, artdaq::MetricMode::LastPoint);
940 :
941 0 : metricMan->sendMetric("ArtdaqInputHelper Latency", fragmentLatency / fragmentCount, "s", 4, artdaq::MetricMode::Average);
942 0 : metricMan->sendMetric("ArtdaqInputHelper Maximum Latency", fragmentLatencyMax, "s", 4, artdaq::MetricMode::Maximum);
943 : }
944 :
945 0 : return std::make_pair(subrunProductsRead, eventProductsRead);
946 : }
947 :
948 : template<typename U>
949 0 : bool art::ArtdaqInputHelper<U>::readNext(art::RunPrincipal* const inR, art::SubRunPrincipal* const inSR,
950 : art::RunPrincipal*& outR, art::SubRunPrincipal*& outSR, art::EventPrincipal*& outE)
951 : {
952 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Begin: ArtdaqInputHelper::readNext";
953 :
954 0 : auto read_start_time = std::chrono::steady_clock::now();
955 :
956 0 : std::shared_ptr<ArtdaqEvent> eventMap = communicationWrapper_.receiveMessages();
957 0 : auto got_event_time = std::chrono::steady_clock::now();
958 :
959 0 : if (eventMap == nullptr)
960 : {
961 0 : TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "No Fragments received! Aborting...";
962 0 : shutdownMsgReceived_ = true;
963 0 : TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::readNext";
964 0 : return false;
965 : }
966 :
967 0 : if (eventMap->FirstFragmentType() == artdaq::Fragment::EndOfDataFragmentType)
968 : {
969 0 : TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "Shutdown message received!";
970 0 : shutdownMsgReceived_ = true;
971 0 : TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::readNext";
972 0 : return false;
973 : }
974 :
975 0 : if (eventMap->FirstFragmentType() == artdaq::Fragment::InitFragmentType)
976 : {
977 : #if CAN_REINIT
978 : TLOG(TLVL_INFO, "ArtdaqInputHelper") << "Additional Init Message received! Attempting to register new products...";
979 : readInitMessage();
980 : #else
981 0 : TLOG(TLVL_WARNING, "ArtdaqInputHelper") << "Received additional Init Message! Check init_fragment_count configuration!";
982 : #endif
983 0 : TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::readNext";
984 0 : return true;
985 : }
986 :
987 0 : if (fragmentsOnlyMode_)
988 : {
989 0 : if (eventMap->fragments.count(artdaq::Fragment::DataFragmentType) || eventMap->fragments.count(artdaq::Fragment::RunDataFragmentType) || eventMap->fragments.count(artdaq::Fragment::SubrunDataFragmentType))
990 : {
991 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext unexpectedly got a message with a DataFragment. This art Event will NOT be reconstructed!";
992 : }
993 :
994 0 : auto firstFragmentType = eventMap->FirstFragmentType();
995 0 : TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "First Fragment type is " << static_cast<int>(firstFragmentType);
996 0 : if (constructPrincipal(eventMap, inR, inSR, outR, outSR, outE))
997 : {
998 0 : readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
999 : }
1000 : }
1001 : else
1002 : {
1003 0 : std::list<std::unique_ptr<TBufferFile>> msgs;
1004 0 : if (eventMap->fragments.count(artdaq::Fragment::RunDataFragmentType))
1005 0 : for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::RunDataFragmentType]))
1006 : {
1007 0 : if (!dataFrag.hasMetadata()) continue;
1008 0 : auto header = dataFrag.metadata<artdaq::NetMonHeader>();
1009 0 : msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
1010 : }
1011 0 : if (eventMap->fragments.count(artdaq::Fragment::SubrunDataFragmentType))
1012 0 : for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::SubrunDataFragmentType]))
1013 : {
1014 0 : if (!dataFrag.hasMetadata()) continue;
1015 0 : auto header = dataFrag.metadata<artdaq::NetMonHeader>();
1016 0 : msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
1017 : }
1018 0 : if (eventMap->fragments.count(artdaq::Fragment::DataFragmentType))
1019 0 : for (auto& dataFrag : *(eventMap->fragments[artdaq::Fragment::DataFragmentType]))
1020 : {
1021 0 : if (!dataFrag.hasMetadata()) continue;
1022 0 : auto header = dataFrag.metadata<artdaq::NetMonHeader>();
1023 0 : msgs.emplace_back(new TBufferFile(TBuffer::kRead, header->data_length, dataFrag.dataBegin(), kFALSE, nullptr));
1024 : }
1025 :
1026 : //
1027 : // Read message type code.
1028 : //
1029 0 : artdaq::NetMonHeader::MessageType msg_type_code = artdaq::NetMonHeader::MessageType::Invalid;
1030 0 : ULong_t msg_type_code_tmp = 0;
1031 0 : for (auto& msg : msgs)
1032 : {
1033 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: "
1034 0 : << "getting message type code ...";
1035 0 : msg->ReadULong(msg_type_code_tmp);
1036 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: "
1037 0 : << "message type: " << msg_type_code_tmp;
1038 :
1039 0 : if (msg_type_code == artdaq::NetMonHeader::MessageType::Invalid)
1040 0 : msg_type_code = static_cast<artdaq::NetMonHeader::MessageType>(msg_type_code_tmp);
1041 0 : else if (msg_type_code != static_cast<artdaq::NetMonHeader::MessageType>(msg_type_code_tmp))
1042 : {
1043 0 : TLOG(TLVL_ERROR, "ArtdaqInputHelper") << "ArtdaqInputHelper::readNext: Received conflicting message type codes! Aborting...";
1044 :
1045 0 : shutdownMsgReceived_ = true;
1046 0 : TLOG(TLVL_DEBUG + 45, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::readNext";
1047 0 : return false;
1048 : }
1049 : }
1050 :
1051 0 : for (auto& msg : msgs)
1052 : {
1053 0 : readAndConstructPrincipal(msg, msg_type_code, inR, inSR, outR, outSR, outE);
1054 : }
1055 : //
1056 : // Read per-event metadata needed to construct principal.
1057 : //
1058 0 : if (msg_type_code == artdaq::NetMonHeader::MessageType::Run)
1059 : {
1060 : // EndRun message.
1061 : // FIXME: We need to merge these into the input RunPrincipal.
1062 0 : readDataProducts(msgs, outR ? outR : inR);
1063 : }
1064 0 : else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
1065 : {
1066 : // FIXME: We need to merge these into the input SubRunPrincipal.
1067 0 : readDataProducts(msgs, outSR ? outSR : inSR);
1068 : }
1069 0 : else if (msg_type_code == artdaq::NetMonHeader::MessageType::Event)
1070 : {
1071 : // Event message.
1072 0 : readDataProducts(msgs, outE);
1073 :
1074 0 : if (eventMap->fragments.size() > 1)
1075 : {
1076 0 : readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
1077 : }
1078 : }
1079 : else
1080 : {
1081 : // Did not have a valid message, try again
1082 0 : return false;
1083 : }
1084 0 : }
1085 :
1086 0 : if (outE != nullptr)
1087 : {
1088 0 : auto artHdrPtr = std::make_unique<artdaq::detail::RawEventHeader>();
1089 0 : auto daqHdrPtr = eventMap->header;
1090 :
1091 0 : if (daqHdrPtr != nullptr)
1092 : {
1093 0 : memcpy(artHdrPtr.get(), daqHdrPtr.get(), sizeof(artdaq::detail::RawEventHeader));
1094 0 : put_product_in_principal(std::move(artHdrPtr), *outE, pretend_module_name, "RawEventHeader");
1095 : }
1096 0 : }
1097 :
1098 0 : auto read_finish_time = std::chrono::steady_clock::now();
1099 0 : TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "readNext: bytesRead=" << bytesRead
1100 0 : << " metricMan=" << static_cast<void*>(metricMan.get());
1101 0 : if (metricMan)
1102 : {
1103 0 : metricMan->sendMetric("Avg Processing Time", artdaq::TimeUtils::GetElapsedTime(last_read_time, read_start_time),
1104 : "s", 2, artdaq::MetricMode::Average);
1105 0 : metricMan->sendMetric("Avg Input Wait Time", artdaq::TimeUtils::GetElapsedTime(read_start_time, got_event_time),
1106 : "s", 3, artdaq::MetricMode::Average);
1107 0 : metricMan->sendMetric("Avg Read Time", artdaq::TimeUtils::GetElapsedTime(got_event_time, read_finish_time), "s",
1108 : 3, artdaq::MetricMode::Average);
1109 : }
1110 :
1111 0 : TLOG(TLVL_DEBUG + 48, "ArtdaqInputHelper") << "End: ArtdaqInputHelper::readNext ret=" << std::boolalpha << (outR || outSR || outE);
1112 0 : last_read_time = std::chrono::steady_clock::now();
1113 0 : return outR || outSR || outE;
1114 0 : }
1115 :
1116 : #endif // ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQINPUTHELPER_HH_
|