Line data Source code
1 : #ifndef ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQOUTPUT_HH_
2 : #define ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQOUTPUT_HH_
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq-core/Data/RawEvent.hh"
8 : #include "artdaq-core/Data/detail/ParentageMap.hh"
9 : #include "artdaq/DAQdata/Globals.hh"
10 : #include "artdaq/DAQdata/NetMonHeader.hh"
11 :
12 : #include "art/Framework/Core/OutputModule.h"
13 : #include "art/Framework/Principal/EventPrincipal.h"
14 : #include "art/Framework/Principal/OutputHandle.h"
15 : #include "art/Framework/Principal/RunPrincipal.h"
16 : #include "art/Framework/Principal/SubRunPrincipal.h"
17 : #include "art/Persistency/Provenance/ModuleContext.h"
18 : #include "art/Persistency/Provenance/ProcessHistoryRegistry.h"
19 : #include "art_root_io/setup.h"
20 :
21 : #include "canvas/Persistency/Common/WrappedTypeID.h"
22 : #include "canvas/Persistency/Provenance/BranchDescription.h"
23 : #include "canvas/Persistency/Provenance/BranchKey.h"
24 : #include "canvas/Persistency/Provenance/ParentageRegistry.h"
25 : #include "canvas/Persistency/Provenance/ProcessConfiguration.h"
26 : #include "canvas/Persistency/Provenance/ProcessConfigurationID.h"
27 : #include "canvas/Persistency/Provenance/ProcessHistoryID.h"
28 : #include "canvas/Persistency/Provenance/ProductList.h"
29 : #include "canvas/Persistency/Provenance/ProductProvenance.h"
30 : #include "canvas/Persistency/Provenance/RunAuxiliary.h"
31 : #include "canvas/Persistency/Provenance/SubRunAuxiliary.h"
32 : #include "canvas/Utilities/Exception.h"
33 : #include "fhiclcpp/ParameterSet.h"
34 : #include "fhiclcpp/ParameterSetRegistry.h"
35 :
36 : #include <TBufferFile.h>
37 : #include <TClass.h>
38 : #include <TList.h>
39 : #include <TStreamerInfo.h>
40 :
41 : #include <unistd.h>
42 : #include <algorithm>
43 : #include <iomanip>
44 : #include <iostream>
45 : #include <iterator>
46 : #include <memory>
47 : #include <mutex>
48 : #include <sstream>
49 : #include <string>
50 : #include <vector>
51 :
52 : #define TLVL_ENTER_EXIT 33
53 : #define TLVL_SENDINIT 39
54 : #define TLVL_SENDINIT_VERBOSE1 40
55 : #define TLVL_SENDINIT_VERBOSE2 41
56 : #define TLVL_WRITEDATAPRODUCTS 42
57 : #define TLVL_WRITEDATAPRODUCTS_VERBOSE 43
58 : #define TLVL_WRITE 44
59 : #define TLVL_WRITERUN 45
60 : #define TLVL_WRITERUN_VERBOSE 46
61 : #define TLVL_WRITESUBRUN 47
62 : #define TLVL_WRITESUBRUN_VERBOSE 48
63 : #define TLVL_EXTRACTPRODUCTS 49
64 : #define TLVL_EXTRACTPRODUCTS_VERBOSE 50
65 : #define TLVL_BEGINJOB 51
66 : #define TLVL_BEGINJOB_VERBOSE 52
67 :
68 : namespace art {
69 : class ArtdaqOutput;
70 : } // namespace art
71 :
72 : static artdaq::FragmentPtr outputFrag = nullptr;
73 : static std::mutex outputFragMutex;
74 0 : inline char* Fragment_ReAllocChar(char* dataPtr, size_t size, size_t /*oldsize*/)
75 : {
76 0 : if (outputFrag != nullptr && dataPtr == reinterpret_cast<char*>(outputFrag->dataBegin())) // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
77 : {
78 0 : outputFrag->resizeBytes(size);
79 :
80 0 : return reinterpret_cast<char*>(outputFrag->dataBegin()); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
81 : }
82 0 : return nullptr;
83 : }
84 :
85 : /// <summary>
86 : /// This is the base class for artdaq OutputModules, providing the serialization interface for art Events.
87 : /// </summary>
88 : class art::ArtdaqOutput : public art::OutputModule
89 : {
90 : public:
91 : /// <summary>
92 : /// ArtdaqOutput Constructor
93 : /// </summary>
94 : /// <param name="ps">ParameterSet used to configure art::OutputModule</param>
95 0 : explicit ArtdaqOutput(fhicl::ParameterSet const& ps)
96 0 : : OutputModule(ps), productList_(), raw_data_label_(ps.get<std::string>("raw_data_label", "daq"))
97 : {
98 0 : root::setup();
99 0 : }
100 :
101 : /// <summary>
102 : /// Destructor
103 : /// </summary>
104 0 : virtual ~ArtdaqOutput() = default;
105 :
106 : protected:
107 : /// <summary>
108 : /// Perform actions necessary for opening files. No-op, but derived classes may override
109 : /// </summary>
110 0 : virtual void openFile(FileBlock const&)
111 : {
112 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin/End: ArtdaqOutput::openFile(const FileBlock&)";
113 0 : }
114 :
115 : /// <summary>
116 : /// Perform actions necessary for closing files. No-op, but derived classes may override
117 : /// </summary>
118 0 : virtual void closeFile() { TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin/End: ArtdaqOutput::closeFile()"; }
119 :
120 : /// <summary>
121 : /// Perform actions nesessary after closing the input file. No-op, but derived classes may override
122 : /// </summary>
123 0 : virtual void respondToCloseInputFile(FileBlock const&)
124 : {
125 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin/End: ArtdaqOutput::"
126 0 : "respondToCloseOutputFiles(FileBlock const&)";
127 0 : }
128 :
129 : /// <summary>
130 : /// Perform actions necessary after closing the output file(s). No-op, but derived classes may override
131 : /// </summary>
132 0 : virtual void respondToCloseOutputFiles(FileBlock const&)
133 : {
134 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin/End: ArtdaqOutput::"
135 0 : "respondToCloseOutputFiles(FileBlock const&)";
136 0 : }
137 :
138 : virtual void beginJob();
139 :
140 : /// <summary>
141 : /// Perform End-of-Job actions. No-op, but derived classes may override
142 : /// </summary>
143 0 : virtual void endJob()
144 : {
145 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin/End: ArtdaqOutput::endJob()";
146 0 : }
147 :
148 : /// <summary>
149 : /// Perform Begin Run actions. Derived classes should implement beginRun_ instead.
150 : /// </summary>
151 : /// <param name="rp">RunPrincipal of new run</param>
152 0 : void beginRun(RunPrincipal const& rp) final
153 : {
154 0 : beginRun_(rp);
155 0 : }
156 : /// <summary>
157 : /// Perform Begin Run actions. No-op, but derived classes may override
158 : /// </summary>
159 0 : virtual void beginRun_(RunPrincipal const&) {}
160 :
161 : /// <summary>
162 : /// Perform Begin SubRun actions. Derived classes should implement beginSubRun_ instead.
163 : /// </summary>
164 : /// <param name="srp">SubRunPrincipal of new subrun</param>
165 0 : void beginSubRun(SubRunPrincipal const& srp) final
166 : {
167 0 : beginSubRun_(srp);
168 0 : }
169 : /// <summary>
170 : /// Perform Begin SubRun actions. No-op, but derived classes may override
171 : /// </summary>
172 0 : virtual void beginSubRun_(SubRunPrincipal const&) {}
173 :
174 : /// <summary>
175 : /// Perform actions for each event. Derived classes should implement event_ instead.
176 : /// </summary>
177 : /// <param name="ep">EventPrincipal of event</param>
178 0 : void event(EventPrincipal const& ep) final
179 : {
180 0 : event_(ep);
181 0 : }
182 : /// <summary>
183 : /// Perform actions for each event. No-op, but derived classes may override
184 : /// </summary>
185 0 : virtual void event_(EventPrincipal const&) {}
186 :
187 : /// <summary>
188 : /// Write an EventPrincipal to TBufferFile and send
189 : /// </summary>
190 : /// <param name="ep">EventPrincipal to write</param>
191 : void write(EventPrincipal& ep) final;
192 :
193 : /// <summary>
194 : /// Write a RunPrincipal to TBufferFile and send
195 : /// </summary>
196 : /// <param name="rp">RunPrincipal to write</param>
197 : void writeRun(RunPrincipal& rp) final;
198 :
199 : /// <summary>
200 : /// Write a SubRunPrincipal to TBufferFile and send
201 : /// </summary>
202 : /// <param name="srp">SubRunPrincipal to write</param>
203 : void writeSubRun(SubRunPrincipal& srp) final;
204 :
205 : /// <summary>
206 : /// Extract the data products from a Principal and write them to the TBufferFile
207 : /// </summary>
208 : /// <param name="msg">Output TBufferFile</param>
209 : /// <param name="principal">Principal from which to extract products</param>
210 : /// <param name="bkv">Branch Keys for data products</param>
211 : void writeDataProducts(std::unique_ptr<TBufferFile>& msg, const Principal& principal, std::vector<BranchKey*>& bkv);
212 :
213 : /// <summary>
214 : /// Send an init message downstream.
215 : /// </summary>
216 : void send_init_message();
217 :
218 : /// <summary>
219 : /// Send the serialized art Event downstream. Artdaq output modules should define this function.
220 : /// </summary>
221 : /// <param name="msg">Serialized art Event</param>
222 : virtual void SendMessage(artdaq::FragmentPtr& msg) = 0;
223 :
224 : private:
225 : ArtdaqOutput(ArtdaqOutput const&) = delete;
226 : ArtdaqOutput(ArtdaqOutput&&) = delete;
227 : ArtdaqOutput& operator=(ArtdaqOutput const&) = delete;
228 : ArtdaqOutput& operator=(ArtdaqOutput&&) = delete;
229 :
230 : bool initMsgSent_{false};
231 : ProductList productList_;
232 : size_t last_fragment_size_{10};
233 : artdaq::Fragment::sequence_id_t last_sequence_id_{0};
234 : artdaq::Fragment::timestamp_t last_timestamp_{0};
235 : std::string raw_data_label_;
236 :
237 0 : std::unique_ptr<TBufferFile> prepareMessage(artdaq::Fragment::sequence_id_t seqID, artdaq::Fragment::timestamp_t ts, artdaq::Fragment::type_t type)
238 : {
239 0 : artdaq::NetMonHeader hdr{};
240 0 : outputFragMutex.lock();
241 0 : outputFrag = std::make_unique<artdaq::Fragment>(last_fragment_size_, seqID, my_rank, type, hdr, ts);
242 0 : auto msg = std::make_unique<TBufferFile>(TBuffer::kWrite, last_fragment_size_ * sizeof(artdaq::RawDataType), outputFrag->dataBegin(), kFALSE, &Fragment_ReAllocChar);
243 0 : msg->SetWriteMode();
244 :
245 0 : if (seqID > last_sequence_id_) last_sequence_id_ = seqID;
246 0 : if (ts > last_timestamp_) last_timestamp_ = ts;
247 :
248 0 : return msg;
249 0 : }
250 :
251 0 : void sendMessage(std::unique_ptr<TBufferFile>& msg)
252 : {
253 0 : artdaq::NetMonHeader hdr{};
254 0 : hdr.data_length = static_cast<uint64_t>(msg->Length());
255 0 : outputFrag->updateMetadata(hdr);
256 0 : outputFrag->resizeBytes(hdr.data_length);
257 0 : last_fragment_size_ = std::ceil(msg->Length() / static_cast<double>(sizeof(artdaq::RawDataType)));
258 :
259 0 : SendMessage(outputFrag);
260 0 : outputFragMutex.unlock();
261 0 : }
262 : };
263 :
264 0 : inline void art::ArtdaqOutput::send_init_message()
265 : {
266 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin: ArtdaqOutput::send_init_message()";
267 : //
268 : // Get the classes we will need.
269 : //
270 : // static TClass* string_class = TClass::GetClass("std::string");
271 : // if (string_class == nullptr) {
272 : // throw art::Exception(art::errors::DictionaryNotFound) <<
273 : // "ArtdaqOutput static send_init_message(): "
274 : // "Could not get TClass for std::string!";
275 : //}
276 0 : static TClass* product_list_class = TClass::GetClass("std::map<art::BranchKey,art::BranchDescription>");
277 0 : if (product_list_class == nullptr)
278 : {
279 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::send_init_message(): " // NOLINT(cert-err60-cpp)
280 : "Could not get TClass for "
281 0 : "map<art::BranchKey,art::BranchDescription>!";
282 : }
283 : // typedef std::map<const ProcessHistoryID,ProcessHistory> ProcessHistoryMap;
284 : // TClass* process_history_map_class = TClass::GetClass(
285 : // "std::map<const art::ProcessHistoryID,art::ProcessHistory>");
286 : // FIXME: Replace the "2" here with a use of the proper enum value!
287 0 : static TClass* process_history_map_class = TClass::GetClass("std::map<const art::Hash<2>,art::ProcessHistory>");
288 0 : if (process_history_map_class == nullptr)
289 : {
290 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::send_init_message(): " // NOLINT(cert-err60-cpp)
291 : "Could not get class for "
292 0 : "std::map<const art::Hash<2>,art::ProcessHistory>!";
293 : }
294 : // static TClass* parentage_map_class = TClass::GetClass(
295 : // "std::map<const art::ParentageID,art::Parentage>");
296 0 : static TClass* parentage_map_class = TClass::GetClass("art::ParentageMap");
297 0 : if (parentage_map_class == nullptr)
298 : {
299 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::send_init_message(): " // NOLINT(cert-err60-cpp)
300 0 : "Could not get class for ParentageMap.";
301 : }
302 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "parentage_map_class: " << static_cast<void*>(parentage_map_class);
303 :
304 : //
305 : // Construct and send the init message.
306 : //
307 0 : auto msg = prepareMessage(0, artdaq::Globals::my_art_id_, artdaq::Fragment::InitFragmentType);
308 : //
309 : // Stream the message type code.
310 : //
311 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Streaming message type code ...";
312 0 : msg->WriteULong(1);
313 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Finished streaming message type code.";
314 :
315 : //
316 : // Stream Class info
317 : //
318 : // ELF: 6/11/2019: This is being done so that if the receiver is a newer version of art/ROOT, it can still understand our version.
319 0 : TList infos;
320 : std::vector<std::string> classNames{"std::map<art::BranchKey,art::BranchDescription>", "std::map<const art::Hash<2>,art::ProcessHistory>", "art::ParentageMap",
321 0 : "art::BranchKey", "art::ProductProvenance", "art::RunAuxiliary", "art::SubRunAuxiliary", "art::EventAuxiliary"};
322 0 : for (auto& className : classNames)
323 : {
324 0 : TClass* class_ptr = TClass::GetClass(className.c_str());
325 0 : if (class_ptr == nullptr)
326 : {
327 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::send_init_message: Could not get TClass for " << className << "!"; // NOLINT(cert-err60-cpp)
328 : }
329 0 : infos.Add(class_ptr->GetStreamerInfo());
330 : }
331 0 : msg->WriteObject(&infos);
332 :
333 : //
334 : // Stream the ParameterSetRegistry.
335 : //
336 0 : ULong_t ps_cnt = fhicl::ParameterSetRegistry::size();
337 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): parameter set count: " << ps_cnt;
338 0 : msg->WriteULong(ps_cnt);
339 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Streaming parameter sets ...";
340 0 : for (auto I = std::begin(fhicl::ParameterSetRegistry::get()), E = std::end(fhicl::ParameterSetRegistry::get());
341 0 : I != E; ++I)
342 : {
343 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "Pset ID " << I->first << ": " << I->second.to_string();
344 0 : std::string pset_str = I->second.to_string();
345 : // msg->WriteObjectAny(&pset_str, string_class);
346 0 : msg->WriteStdString(pset_str);
347 0 : }
348 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Finished streaming parameter sets.";
349 :
350 : //
351 : // Stream the MasterProductRegistry.
352 : //
353 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Streaming Product List sz=" << productList_.size() << "...";
354 0 : msg->WriteObjectAny(&productList_, product_list_class);
355 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Finished streaming Product List.";
356 :
357 0 : art::ProcessHistoryMap phr;
358 0 : for (auto const& pr : art::ProcessHistoryRegistry::get())
359 : {
360 0 : phr.emplace(pr);
361 : }
362 : //
363 : // Dump the ProcessHistoryRegistry.
364 : //
365 0 : TLOG(TLVL_SENDINIT_VERBOSE2, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Dumping ProcessHistoryRegistry ...";
366 : // typedef std::map<const ProcessHistoryID,ProcessHistory>
367 : // ProcessHistoryMap;
368 0 : TLOG(TLVL_SENDINIT_VERBOSE2, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): phr: size: " << phr.size();
369 0 : for (auto I = phr.begin(), E = phr.end(); I != E; ++I)
370 : {
371 0 : std::ostringstream OS;
372 0 : I->first.print(OS);
373 0 : TLOG(TLVL_SENDINIT_VERBOSE2, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): phr: id: '" << OS.str() << "'";
374 0 : }
375 : //
376 : // Stream the ProcessHistoryRegistry.
377 : //
378 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Streaming ProcessHistoryRegistry ...";
379 : // typedef std::map<const ProcessHistoryID,ProcessHistory>
380 : // ProcessHistoryMap;
381 0 : const art::ProcessHistoryMap& phm = phr;
382 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): phm: size: " << phm.size();
383 0 : msg->WriteObjectAny(&phm, process_history_map_class);
384 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Finished streaming ProcessHistoryRegistry.";
385 :
386 : //
387 : // Stream the ParentageRegistry.
388 : //
389 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Streaming ParentageRegistry ..." << static_cast<void*>(parentage_map_class);
390 0 : art::ParentageMap parentageMap{};
391 0 : for (auto const& pr : art::ParentageRegistry::get())
392 : {
393 0 : parentageMap.emplace(pr.first, pr.second);
394 : }
395 :
396 0 : msg->WriteObjectAny(&parentageMap, parentage_map_class);
397 :
398 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Finished streaming ParentageRegistry.";
399 :
400 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Sending init message";
401 0 : sendMessage(msg);
402 0 : TLOG(TLVL_SENDINIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): Done sending init message, sleeping to ensure delivery";
403 0 : usleep(1000000); // Sleep to allow peer init messages to be sent/received
404 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "ArtdaqOutput::send_init_message(): END";
405 0 : initMsgSent_ = true;
406 0 : }
407 :
408 0 : inline void art::ArtdaqOutput::writeDataProducts(std::unique_ptr<TBufferFile>& msg, const Principal& principal, std::vector<BranchKey*>& bkv)
409 : {
410 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "Begin: ArtdaqOutput::writeDataProducts(...)";
411 : //
412 : // Fetch the class dictionaries we need for
413 : // writing out the data products.
414 : //
415 0 : static TClass* branch_key_class = TClass::GetClass("art::BranchKey");
416 0 : if (branch_key_class == nullptr)
417 : {
418 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::writeDataProducts(...): " // NOLINT(cert-err60-cpp)
419 0 : "Could not get TClass for art::BranchKey!";
420 : }
421 0 : static TClass* prdprov_class = TClass::GetClass("art::ProductProvenance");
422 0 : if (prdprov_class == nullptr)
423 : {
424 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::writeDataProducts(...): " // NOLINT(cert-err60-cpp)
425 0 : "Could not get TClass for art::ProductProvenance!";
426 : }
427 :
428 : //
429 : // Calculate the data product count.
430 : //
431 0 : ULong_t prd_cnt = 0;
432 : // std::map<art::BranchID, std::shared_ptr<art::Group>>::const_iterator
433 0 : for (auto I = principal.begin(), E = principal.end(); I != E; ++I)
434 : {
435 0 : auto const& productDescription = I->second->productDescription();
436 0 : auto const& refs = keptProducts()[productDescription.branchType()];
437 0 : bool found = false;
438 0 : for (auto const& ref : refs)
439 : {
440 0 : if (ref.second == productDescription)
441 : {
442 0 : found = true;
443 0 : break;
444 : }
445 : }
446 0 : if (!I->second->productAvailable() || !found)
447 : {
448 0 : continue;
449 : }
450 0 : ++prd_cnt;
451 : }
452 : //
453 : // Write the data product count.
454 : //
455 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "ArtdaqOutput::writeDataProducts(...): Streaming product count: " +
456 0 : std::to_string(prd_cnt);
457 0 : msg->WriteULong(prd_cnt);
458 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "ArtdaqOutput::writeDataProducts(...): Finished streaming product count.";
459 :
460 : //
461 : // Loop over the groups in the RunPrincipal and
462 : // write out the data products.
463 : //
464 : // Note: We need this vector of keys because the ROOT I/O mechanism
465 : // requires that each object inserted in the message has a
466 : // unique address, so we force that by holding on to each
467 : // branch key manufactured in the loop until after we are
468 : // done constructing the message.
469 : //
470 0 : bkv.reserve(prd_cnt);
471 : // std::map<art::BranchID, std::shared_ptr<art::Group>>::const_iterator
472 0 : for (auto I = principal.begin(), E = principal.end(); I != E; ++I)
473 : {
474 0 : auto const& productDescription = I->second->productDescription();
475 0 : auto const& refs = keptProducts()[productDescription.branchType()];
476 0 : bool found = false;
477 0 : for (auto const& ref : refs)
478 : {
479 0 : if (ref.second == productDescription)
480 : {
481 0 : found = true;
482 0 : break;
483 : }
484 : }
485 0 : if (!I->second->productAvailable() || !found)
486 : {
487 0 : continue;
488 : }
489 0 : const BranchDescription& bd(I->second->productDescription());
490 0 : bkv.push_back(new BranchKey(bd));
491 0 : TLOG(TLVL_WRITEDATAPRODUCTS_VERBOSE, "ArtdaqOutput")
492 0 : << "ArtdaqOutput::writeDataProducts(...): Dumping branch key of class: '"
493 0 : << bkv.back()->friendlyClassName_ << "' modlbl: '" << bkv.back()->moduleLabel_ << "' instnm: '"
494 0 : << bkv.back()->productInstanceName_ << "' procnm: '" << bkv.back()->processName_ << "'";
495 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "ArtdaqOutput::writeDataProducts(...): "
496 0 : "Streaming branch key of class: '"
497 0 : << bd.producedClassName() << "' modlbl: '" << bd.moduleLabel() << "' instnm: '"
498 0 : << bd.productInstanceName() << "' procnm: '" << bd.processName() << "'";
499 0 : msg->WriteObjectAny(bkv.back(), branch_key_class);
500 :
501 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "ArtdaqOutput::writeDataProducts(...): "
502 0 : "Streaming product of class: '"
503 0 : << bd.producedClassName() << "' modlbl: '" << bd.moduleLabel() << "' instnm: '"
504 0 : << bd.productInstanceName() << "' procnm: '" << bd.processName() << "'";
505 :
506 0 : OutputHandle oh = principal.getForOutput(bd.productID(), true);
507 0 : const EDProduct* prd = oh.wrapper();
508 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "Class for branch " << bd.wrappedName() << " is "
509 0 : << static_cast<void*>(TClass::GetClass(bd.wrappedName().c_str()));
510 0 : msg->WriteObjectAny(prd, TClass::GetClass(bd.wrappedName().c_str()));
511 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "ArtdaqOutput::writeDataProducts(...): "
512 0 : "Streaming product provenance of class: '"
513 0 : << bd.producedClassName() << "' modlbl: '" << bd.moduleLabel() << "' instnm: '"
514 0 : << bd.productInstanceName() << "' procnm: '" << bd.processName() << "'";
515 :
516 0 : const ProductProvenance* prdprov = I->second->productProvenance().get();
517 :
518 0 : msg->WriteObjectAny(prdprov, prdprov_class);
519 0 : }
520 0 : TLOG(TLVL_WRITEDATAPRODUCTS, "ArtdaqOutput") << "End: ArtdaqOutput::writeDataProducts(...)";
521 0 : }
522 :
523 0 : inline void art::ArtdaqOutput::write(EventPrincipal& ep)
524 : {
525 : //
526 : // Write an Event message.
527 : //
528 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin: ArtdaqOutput::write(EventPrincipal& ep)";
529 0 : if (!initMsgSent_)
530 : {
531 0 : send_init_message();
532 : }
533 : //
534 : // Get root classes needed for I/O.
535 : //
536 0 : static TClass* run_aux_class = TClass::GetClass("art::RunAuxiliary");
537 0 : if (run_aux_class == nullptr)
538 : {
539 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::write: " // NOLINT(cert-err60-cpp)
540 0 : "Could not get TClass for art::RunAuxiliary!";
541 : }
542 0 : static TClass* subrun_aux_class = TClass::GetClass("art::SubRunAuxiliary");
543 0 : if (subrun_aux_class == nullptr)
544 : {
545 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::write: " // NOLINT(cert-err60-cpp)
546 0 : "Could not get TClass for art::SubRunAuxiliary!";
547 : }
548 0 : static TClass* event_aux_class = TClass::GetClass("art::EventAuxiliary");
549 0 : if (event_aux_class == nullptr)
550 : {
551 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::write: " // NOLINT(cert-err60-cpp)
552 0 : "Could not get TClass for art::EventAuxiliary!";
553 : }
554 :
555 : // Subrun number starts at 1
556 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Setting Output Fragment Header Fields";
557 0 : auto seqID = (static_cast<uint64_t>(ep.eventID().subRun()) << 32) + ep.eventID().event();
558 :
559 0 : art::ProcessTag tag("", processName());
560 0 : auto res = ep.getMany(art::ModuleContext::invalid(), art::WrappedTypeID::make<artdaq::detail::RawEventHeader>(), art::MatchAllSelector(), tag);
561 :
562 0 : artdaq::Fragment::timestamp_t ts = 0;
563 :
564 0 : for (auto const& qr : res)
565 : {
566 0 : Handle<artdaq::detail::RawEventHeader> handle{qr};
567 0 : if (handle.isValid())
568 : {
569 0 : if (handle->timestamp > ts) ts = handle->timestamp;
570 : }
571 0 : }
572 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Data Fragment Header Fields: SeqID: " << seqID << ", timestamp: " << ts;
573 :
574 : //
575 : // Setup message buffer.
576 : //
577 0 : auto msg = prepareMessage(seqID, ts, artdaq::Fragment::DataFragmentType);
578 : //
579 : // Write message type code.
580 : //
581 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Streaming message type code ...";
582 0 : msg->WriteULong(static_cast<ULong_t>(artdaq::NetMonHeader::MessageType::Event));
583 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Finished streaming message type code.";
584 :
585 : //
586 : // Write RunAuxiliary.
587 : //
588 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Streaming RunAuxiliary ...";
589 0 : msg->WriteObjectAny(&ep.subRunPrincipal().runPrincipal().runAux(), run_aux_class);
590 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Finished streaming RunAuxiliary.";
591 :
592 : //
593 : // Write SubRunAuxiliary.
594 : //
595 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Streaming SubRunAuxiliary ...";
596 0 : msg->WriteObjectAny(&ep.subRunPrincipal().subRunAux(), subrun_aux_class);
597 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Finished streaming SubRunAuxiliary.";
598 :
599 : //
600 : // Write EventAuxiliary.
601 : //
602 : {
603 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Streaming EventAuxiliary ...";
604 0 : msg->WriteObjectAny(&ep.eventAux(), event_aux_class);
605 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Finished streaming EventAuxiliary.";
606 : }
607 :
608 : //
609 : // Write data products.
610 : //
611 0 : std::vector<BranchKey*> bkv;
612 0 : writeDataProducts(msg, ep, bkv);
613 :
614 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Sending message";
615 0 : sendMessage(msg);
616 0 : TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Done sending message";
617 :
618 : //
619 : // Delete the branch keys we created for the message.
620 : //
621 0 : for (auto I = bkv.begin(), E = bkv.end(); I != E; ++I)
622 : {
623 0 : delete *I;
624 0 : *I = 0;
625 : }
626 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "End: ArtdaqOutput::write(EventPrincipal& ep)";
627 0 : }
628 :
629 0 : inline void art::ArtdaqOutput::writeSubRun(SubRunPrincipal& srp)
630 : { //
631 : // Write an SubRun message.
632 : //
633 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin: ArtdaqOutput::writeSubRun(SubRunPrincipal& srp)";
634 0 : if (!initMsgSent_)
635 : {
636 : // TLOG(TLVL_WARNING, "ArtdaqOutput") << "Not sending Subrun message before Event!";
637 : // return;
638 0 : send_init_message();
639 : }
640 :
641 : //
642 : // Get root classes needed for I/O.
643 : //
644 0 : static TClass* run_aux_class = TClass::GetClass("art::RunAuxiliary");
645 0 : if (run_aux_class == nullptr)
646 : {
647 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::writeSubRun: " // NOLINT(cert-err60-cpp)
648 0 : "Could not get TClass for art::RunAuxiliary!";
649 : }
650 0 : static TClass* subrun_aux_class = TClass::GetClass("art::SubRunAuxiliary");
651 0 : if (subrun_aux_class == nullptr)
652 : {
653 0 : throw art::Exception(art::errors::DictionaryNotFound) << "ArtdaqOutput::writeSubRun: " // NOLINT(cert-err60-cpp)
654 0 : "Could not get TClass for art::SubRunAuxiliary!";
655 : }
656 : //
657 : // Begin preparing message.
658 : //
659 0 : auto msg = prepareMessage(static_cast<uint64_t>(srp.subRun()) << 32, srp.subRun() + 1, artdaq::Fragment::SubrunDataFragmentType);
660 : //
661 : // Write message type code.
662 : //
663 : {
664 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: streaming message type code ...";
665 0 : msg->WriteULong(static_cast<ULong_t>(artdaq::NetMonHeader::MessageType::Subrun));
666 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: finished streaming message type code.";
667 : }
668 : //
669 : // Write RunAuxiliary.
670 : //
671 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: Streaming RunAuxiliary ...";
672 0 : msg->WriteObjectAny(&srp.runPrincipal().runAux(), run_aux_class);
673 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: Finished streaming RunAuxiliary.";
674 :
675 : //
676 : // Write SubRunAuxiliary.
677 : //
678 : {
679 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: streaming SubRunAuxiliary ...";
680 :
681 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: dumping ProcessHistoryRegistry ...";
682 : // typedef std::map<const ProcessHistoryID,ProcessHistory>
683 : // ProcessHistoryMap;
684 0 : for (auto I = std::begin(art::ProcessHistoryRegistry::get()), E = std::end(art::ProcessHistoryRegistry::get());
685 0 : I != E; ++I)
686 : {
687 0 : std::ostringstream OS;
688 0 : I->first.print(OS);
689 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: phr: id: '" << OS.str() << "'";
690 0 : OS.str("");
691 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: phr: data.size(): " << I->second.data().size();
692 0 : if (!I->second.data().empty())
693 : {
694 0 : I->second.data().back().id().print(OS);
695 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: phr: data.back().id(): '" << OS.str() << "'";
696 : }
697 0 : }
698 0 : if (!srp.subRunAux().processHistoryID().isValid())
699 : {
700 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: ProcessHistoryID: 'INVALID'";
701 : }
702 : else
703 : {
704 0 : std::ostringstream OS;
705 0 : srp.subRunAux().processHistoryID().print(OS);
706 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: ProcessHistoryID: '" << OS.str() << "'";
707 0 : OS.str("");
708 0 : ProcessHistory processHistory;
709 0 : ProcessHistoryRegistry::get(srp.subRunAux().processHistoryID(), processHistory);
710 0 : if (!processHistory.data().empty())
711 : {
712 : // FIXME: Print something special on invalid id() here!
713 0 : processHistory.data().back().id().print(OS);
714 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: ProcessConfigurationID: '" << OS.str() << "'";
715 0 : OS.str("");
716 0 : OS << processHistory.data().back();
717 0 : TLOG(TLVL_WRITESUBRUN_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: ProcessConfiguration: '" << OS.str();
718 : }
719 0 : }
720 0 : msg->WriteObjectAny(&srp.subRunAux(), subrun_aux_class);
721 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: streamed SubRunAuxiliary.";
722 : }
723 : //
724 : // Write data products.
725 : //
726 0 : std::vector<BranchKey*> bkv;
727 0 : writeDataProducts(msg, srp, bkv);
728 :
729 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: Sending message";
730 0 : sendMessage(msg);
731 0 : TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "ArtdaqOutput::writeSubRun: Done sending message";
732 :
733 : //
734 : // Delete the branch keys we created for the message.
735 : //
736 0 : for (auto I = bkv.begin(), E = bkv.end(); I != E; ++I)
737 : {
738 0 : delete *I;
739 0 : *I = 0;
740 : }
741 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "End: ArtdaqOutput::writeSubRun(const SubRunPrincipal& srp)";
742 0 : }
743 :
744 0 : inline void art::ArtdaqOutput::writeRun(RunPrincipal& rp)
745 : { //
746 : // Write an Run message.
747 : //
748 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin: ArtdaqOutput::writeRun(const RunPrincipal& rp)";
749 : (void)rp;
750 0 : if (!initMsgSent_)
751 : {
752 : // TLOG(TLVL_WARNING, "ArtdaqOutput") << "Not sending Run message before Event!";
753 : // return;
754 0 : send_init_message();
755 : }
756 :
757 : //
758 : // Fetch the class dictionaries we need for
759 : // writing out the auxiliary information.
760 : //
761 0 : static TClass* run_aux_class = TClass::GetClass("art::RunAuxiliary");
762 0 : assert(run_aux_class != nullptr && "writeRun: Could not get TClass for art::RunAuxiliary!");
763 : //
764 : // Begin preparing message.
765 : //
766 0 : auto msg = prepareMessage(artdaq::Fragment::InvalidSequenceID - 1, rp.run() + 1, artdaq::Fragment::RunDataFragmentType);
767 : //
768 : // Write message type code.
769 : //
770 : {
771 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: streaming message type code ...";
772 0 : msg->WriteULong(static_cast<ULong_t>(artdaq::NetMonHeader::MessageType::Run));
773 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: finished streaming message type code.";
774 : }
775 : //
776 : // Write RunAuxiliary.
777 : //
778 : {
779 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: streaming RunAuxiliary ...";
780 0 : msg->WriteObjectAny(&rp.runAux(), run_aux_class);
781 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: streamed RunAuxiliary.";
782 : }
783 : //
784 : // Write data products.
785 : //
786 0 : std::vector<BranchKey*> bkv;
787 0 : writeDataProducts(msg, rp, bkv);
788 :
789 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: Sending message";
790 0 : sendMessage(msg);
791 0 : TLOG(TLVL_WRITERUN, "ArtdaqOutput") << "ArtdaqOutput::writeRun: Done sending message";
792 :
793 0 : for (auto I = bkv.begin(), E = bkv.end(); I != E; ++I)
794 : {
795 0 : delete *I;
796 0 : *I = 0;
797 : }
798 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "End: ArtdaqOutput::writeRun(RunPrincipal& rp)";
799 0 : }
800 :
801 0 : inline void art::ArtdaqOutput::beginJob()
802 : {
803 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "Begin: ArtdaqOutput::beginJob()";
804 0 : auto const& products = keptProducts();
805 :
806 : // std::array<Selections, NumBranchTypes>
807 0 : for (auto const& selections : products)
808 : {
809 : // Selections = std::map<ProductID, BranchDescription>
810 0 : for (auto const& prod_pair : selections)
811 : {
812 0 : auto const& productDescription = prod_pair.second;
813 0 : auto const& branchKey = BranchKey(productDescription);
814 :
815 0 : if (!productList_.count(branchKey))
816 : {
817 0 : TLOG(TLVL_BEGINJOB_VERBOSE, "ArtdaqOutput") << "ArtdaqOutput::beginJob:"
818 0 : << "Adding branch key to productList of class: '"
819 0 : << branchKey.friendlyClassName_ << "' modlbl: '" << branchKey.moduleLabel_ << "' instnm: '"
820 0 : << branchKey.productInstanceName_ << "' procnm: '" << branchKey.processName_ << "'"
821 0 : << ", description: " << productDescription.wrappedName();
822 :
823 0 : productList_[branchKey] = productDescription;
824 : }
825 0 : }
826 : }
827 0 : TLOG(TLVL_ENTER_EXIT, "ArtdaqOutput") << "End: ArtdaqOutput::beginJob() Product list sz=" << productList_.size();
828 :
829 : // send_init_message();
830 0 : }
831 :
832 : #endif // ARTDAQ_ARTDAQ_ARTMODULES_ARTDAQOUTPUT_HH_
|