Line data Source code
1 : #define TRACE_NAME "BinaryNetOutput"
2 : #include "artdaq/DAQdata/Globals.hh"
3 :
4 : #include "art/Framework/Core/ModuleMacros.h"
5 : #include "art/Framework/Core/OutputModule.h"
6 : #include "art/Framework/Principal/EventPrincipal.h"
7 : #include "art/Framework/Principal/Handle.h"
8 : #include "art/Framework/Principal/RunPrincipal.h"
9 : #include "art/Framework/Principal/Selector.h"
10 : #include "art/Framework/Principal/SubRunPrincipal.h"
11 : #include "art/Persistency/Common/GroupQueryResult.h"
12 : #include "art/Persistency/Provenance/ModuleContext.h"
13 : #include "canvas/Persistency/Common/WrappedTypeID.h"
14 : #include "canvas/Persistency/Common/Wrapper.h"
15 : #include "canvas/Utilities/DebugMacros.h"
16 : #include "canvas/Utilities/Exception.h"
17 : #include "fhiclcpp/ParameterSet.h"
18 :
19 : #include "artdaq-core/Data/Fragment.hh"
20 : #include "artdaq/DAQrate/DataSenderManager.hh"
21 :
22 : #include <unistd.h>
23 : #include <iomanip>
24 : #include <iostream>
25 : #include <memory>
26 : #include <sstream>
27 : #include <string>
28 : #include <vector>
29 :
30 : namespace art {
31 : class BinaryNetOutput;
32 : }
33 :
34 : using art::BinaryNetOutput;
35 : using fhicl::ParameterSet;
36 :
37 : /**
38 : * \brief An art::OutputModule which sends Fragments using DataSenderManager.
39 : * This module produces output identical to that of a BoardReader, for use in
40 : * systems which have multiple layers of EventBuilders.
41 : */
42 : class art::BinaryNetOutput final : public OutputModule
43 : {
44 : public:
45 : /**
46 : * \brief BinaryNetOutput Constructor
47 : * \param ps ParameterSet used to configure BinaryNetOutput
48 : *
49 : * BinaryNetOutput forwards its ParameterSet to art::OutputModule,
50 : * so any Parameters it requires are also required by BinaryNetOutput.
51 : * BinaryNetOutput also forwards its ParameterSet to DataSenderManager,
52 : * so any Parameters *it* requires are *also* required by BinaryMPIOuptut.
53 : * Finally, BinaryNetOutput accpets the following parameters:
54 : * "rt_priority" (Default: 0): Priority for this thread
55 : * "module_name" (Default: BinaryNetOutput): Friendly name for this module (MessageFacility Category)
56 : */
57 : explicit BinaryNetOutput(ParameterSet const& ps);
58 :
59 : /**
60 : * \brief BinaryNetOutput Destructor
61 : */
62 : ~BinaryNetOutput() override;
63 :
64 : private:
65 : BinaryNetOutput(BinaryNetOutput const&) = delete;
66 : BinaryNetOutput(BinaryNetOutput&&) = delete;
67 : BinaryNetOutput& operator=(BinaryNetOutput const&) = delete;
68 : BinaryNetOutput& operator=(BinaryNetOutput&&) = delete;
69 :
70 : void beginJob() override;
71 :
72 : void endJob() override;
73 :
74 : void write(EventPrincipal& /*ep*/) override;
75 :
76 0 : void writeRun(RunPrincipal& /*r*/) override{};
77 0 : void writeSubRun(SubRunPrincipal& /*sr*/) override{};
78 :
79 : void initialize_MPI_();
80 :
81 : void deinitialize_MPI_();
82 :
83 : bool readParameterSet_(fhicl::ParameterSet const& pset);
84 :
85 : private:
86 : ParameterSet data_pset_;
87 : std::string name_ = "BinaryNetOutput";
88 : int rt_priority_ = 0;
89 : std::unique_ptr<artdaq::DataSenderManager> sender_ptr_ = {nullptr};
90 : };
91 :
92 0 : art::BinaryNetOutput::BinaryNetOutput(ParameterSet const& ps)
93 0 : : OutputModule(ps)
94 : {
95 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryNetOutput::BinaryNetOutput(ParameterSet const& ps)\n";
96 0 : readParameterSet_(ps);
97 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryNetOutput::BinaryNetOutput(ParameterSet const& ps)\n";
98 0 : }
99 :
100 0 : art::BinaryNetOutput::~BinaryNetOutput() { TLOG(TLVL_DEBUG + 32) << "Begin/End: BinaryNetOutput::~BinaryNetOutput()\n"; }
101 :
102 0 : void art::BinaryNetOutput::beginJob()
103 : {
104 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryNetOutput::beginJob()\n";
105 0 : initialize_MPI_();
106 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryNetOutput::beginJob()\n";
107 0 : }
108 :
109 0 : void art::BinaryNetOutput::endJob()
110 : {
111 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryNetOutput::endJob()\n";
112 0 : deinitialize_MPI_();
113 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryNetOutput::endJob()\n";
114 0 : }
115 :
116 0 : void art::BinaryNetOutput::initialize_MPI_()
117 : {
118 0 : if (rt_priority_ > 0)
119 : {
120 : #pragma GCC diagnostic push
121 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
122 0 : sched_param s_param = {};
123 0 : s_param.sched_priority = rt_priority_;
124 0 : int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
125 0 : if (status != 0)
126 : {
127 0 : TLOG(TLVL_ERROR) << name_ << "Failed to set realtime priority to " << rt_priority_
128 0 : << ", return code = " << status;
129 : }
130 : #pragma GCC diagnostic pop
131 : }
132 :
133 0 : sender_ptr_ = std::make_unique<artdaq::DataSenderManager>(data_pset_);
134 0 : assert(sender_ptr_);
135 0 : }
136 :
137 0 : void art::BinaryNetOutput::deinitialize_MPI_() { sender_ptr_.reset(nullptr); }
138 :
139 0 : bool art::BinaryNetOutput::readParameterSet_(fhicl::ParameterSet const& pset)
140 : {
141 0 : TLOG(TLVL_DEBUG + 32) << name_ << "BinaryNetOutput::readParameterSet_ method called with "
142 0 : << "ParameterSet = \"" << pset.to_string() << "\".";
143 :
144 : // determine the data sending parameters
145 0 : data_pset_ = pset;
146 0 : name_ = pset.get<std::string>("module_name", "BinaryNetOutput");
147 0 : rt_priority_ = pset.get<int>("rt_priority", 0);
148 :
149 0 : TLOG(TLVL_DEBUG + 33) << "BinaryNetOutput::readParameterSet()";
150 :
151 0 : return true;
152 : }
153 :
154 0 : void art::BinaryNetOutput::write(EventPrincipal& ep)
155 : {
156 0 : assert(sender_ptr_);
157 :
158 : using RawEvent = artdaq::Fragments;
159 : ;
160 : using RawEvents = std::vector<RawEvent>;
161 : using RawEventHandle = art::Handle<RawEvent>;
162 : using RawEventHandles = std::vector<RawEventHandle>;
163 :
164 0 : auto result_handles = std::vector<art::GroupQueryResult>();
165 :
166 0 : auto const& wrapped = art::WrappedTypeID::make<RawEvent>();
167 :
168 0 : ModuleContext const mc{moduleDescription()};
169 0 : ProcessTag const processTag{"", mc.moduleDescription().processName()};
170 :
171 0 : result_handles = ep.getMany(mc, wrapped, art::MatchAllSelector{}, processTag);
172 :
173 0 : artdaq::Fragment::sequence_id_t sequence_id = 0;
174 :
175 0 : for (auto const& result_handle : result_handles)
176 : {
177 0 : auto const raw_event_handle = RawEventHandle(result_handle);
178 :
179 0 : if (!raw_event_handle.isValid())
180 : {
181 0 : continue;
182 : }
183 :
184 0 : for (auto const& fragment : *raw_event_handle)
185 : {
186 0 : auto fragment_copy = fragment;
187 0 : auto fragid_id = fragment_copy.fragmentID();
188 0 : sequence_id = fragment_copy.sequenceID();
189 0 : TLOG(TLVL_DEBUG + 32) << "BinaryNetOutput::write seq=" << sequence_id << " frag=" << fragid_id << " start";
190 0 : sender_ptr_->sendFragment(std::move(fragment_copy));
191 0 : TLOG(TLVL_DEBUG + 32) << "BinaryNetOutput::write seq=" << sequence_id << " frag=" << fragid_id << " done";
192 0 : }
193 0 : }
194 :
195 : // Events are unique in art, so this will be the only send with this sequence ID!
196 : // ELF 1/23/2020: Only remove routing entry AFTER all Fragments have been sent!
197 0 : sender_ptr_->RemoveRoutingTableEntry(sequence_id);
198 0 : }
199 :
200 0 : DEFINE_ART_MODULE(art::BinaryNetOutput) // NOLINT(performance-unnecessary-value-param)
|