Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #define TRACE_NAME "BinaryFileOutput"
3 :
4 : #include "art/Framework/Core/ModuleMacros.h"
5 : #include "art/Framework/Core/OutputModule.h"
6 : #include "art/Framework/IO/FileStatsCollector.h"
7 : #include "art/Framework/IO/PostCloseFileRenamer.h"
8 : #include "art/Framework/Principal/EventPrincipal.h"
9 : #include "art/Framework/Principal/Handle.h"
10 : #include "art/Framework/Principal/RunPrincipal.h"
11 : #include "art/Framework/Principal/SubRunPrincipal.h"
12 : #include "art/Persistency/Common/GroupQueryResult.h"
13 : #include "art/Persistency/Provenance/ModuleContext.h"
14 : #include "canvas/Persistency/Common/WrappedTypeID.h"
15 : #include "canvas/Persistency/Common/Wrapper.h"
16 : #include "canvas/Utilities/DebugMacros.h"
17 : #include "canvas/Utilities/Exception.h"
18 : #include "fhiclcpp/ParameterSet.h"
19 :
20 : #include "artdaq-core/Data/Fragment.hh"
21 : #include "artdaq/DAQdata/Globals.hh"
22 :
23 : #include <unistd.h>
24 : #include <cstdio>
25 : #include <fstream>
26 : #include <iomanip>
27 : #include <iostream>
28 : #include <memory>
29 : #include <sstream>
30 : #include <string>
31 : #include <vector>
32 :
33 : namespace art {
34 : class BinaryFileOutput;
35 : }
36 :
37 : using art::BinaryFileOutput;
38 : using fhicl::ParameterSet;
39 :
40 : /**
41 : * \brief The BinaryFileOutput module streams art Events to a binary file, bypassing ROOT
42 : */
43 : class art::BinaryFileOutput final : public OutputModule
44 : {
45 : public:
46 : /**
47 : * \brief BinaryFileOutput Constructor
48 : * \param ps ParameterSet used to configure BinaryFileOutput
49 : *
50 : * BinaryFileOutput accepts the same configuration parameters as art::OutputModule.
51 : * It has the same name substitution code that RootOutput uses to uniquify names.
52 : *
53 : * BinaryFileOutput also expects the following Parameters:
54 : * "fileName" (REQUIRED): Name of the file to write
55 : * "directIO" (Default: false): Whether to use O_DIRECT (Ref. issue #24437)
56 : */
57 : explicit BinaryFileOutput(ParameterSet const& ps);
58 :
59 : /**
60 : * \brief BinaryFileOutput Destructor
61 : */
62 : ~BinaryFileOutput() override;
63 :
64 : private:
65 : void beginJob() override;
66 :
67 : void endJob() override;
68 :
69 : void write(EventPrincipal& /*ep*/) override;
70 :
71 0 : void writeRun(RunPrincipal& /*r*/) override{};
72 0 : void writeSubRun(SubRunPrincipal& /*sr*/) override{};
73 :
74 : void initialize_FILE_();
75 :
76 : void deinitialize_FILE_();
77 :
78 : bool readParameterSet_(fhicl::ParameterSet const& pset);
79 :
80 : private:
81 : BinaryFileOutput(BinaryFileOutput const&) = delete;
82 : BinaryFileOutput(BinaryFileOutput&&) = delete;
83 : BinaryFileOutput& operator=(BinaryFileOutput const&) = delete;
84 : BinaryFileOutput& operator=(BinaryFileOutput&&) = delete;
85 :
86 : std::string name_ = "BinaryFileOutput";
87 : std::string file_name_ = "/tmp/artdaqdemo.binary";
88 : bool do_direct_ = false;
89 : int fd_ = -1; // Used for direct IO
90 : std::unique_ptr<std::ofstream> file_ptr_ = {nullptr};
91 : art::FileStatsCollector fstats_;
92 : };
93 :
94 0 : art::BinaryFileOutput::BinaryFileOutput(ParameterSet const& ps)
95 0 : : OutputModule(ps), fstats_{name_, processName()}
96 : {
97 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryFileOutput::BinaryFileOutput(ParameterSet const& ps)\n";
98 0 : readParameterSet_(ps);
99 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryFileOutput::BinaryFileOutput(ParameterSet const& ps)\n";
100 0 : }
101 :
102 0 : art::BinaryFileOutput::~BinaryFileOutput() { TLOG(TLVL_DEBUG + 32) << "Begin/End: BinaryFileOutput::~BinaryFileOutput()\n"; }
103 :
104 0 : void art::BinaryFileOutput::beginJob()
105 : {
106 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryFileOutput::beginJob()\n";
107 0 : initialize_FILE_();
108 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryFileOutput::beginJob()\n";
109 0 : }
110 :
111 0 : void art::BinaryFileOutput::endJob()
112 : {
113 0 : TLOG(TLVL_DEBUG + 32) << "Begin: BinaryFileOutput::endJob()\n";
114 0 : deinitialize_FILE_();
115 0 : TLOG(TLVL_DEBUG + 32) << "End: BinaryFileOutput::endJob()\n";
116 0 : }
117 :
118 0 : void art::BinaryFileOutput::initialize_FILE_()
119 : {
120 0 : std::string file_name = PostCloseFileRenamer{fstats_}.applySubstitutions(file_name_);
121 0 : if (do_direct_)
122 : {
123 0 : fd_ = open(file_name.c_str(), O_WRONLY | O_CREAT | O_DIRECT, 0660); // O_DIRECT - ref. artdaq-core/Core/QuickVec.hh:#define QV_ALIGN 512 and artdaq issue #24437
124 0 : TLOG(TLVL_DEBUG + 33) << "initialize_FILE_ fd_=" << fd_;
125 : }
126 : else
127 : {
128 0 : file_ptr_ = std::make_unique<std::ofstream>(file_name, std::ofstream::binary);
129 0 : TLOG(TLVL_DEBUG + 33) << "BinaryFileOutput::initialize_FILE_ file_ptr_=" << static_cast<void*>(file_ptr_.get()) << " errno=" << errno;
130 : // file_ptr_->rdbuf()->pubsetbuf(0, 0);
131 : }
132 0 : fstats_.recordFileOpen();
133 0 : }
134 :
135 0 : void art::BinaryFileOutput::deinitialize_FILE_()
136 : {
137 0 : if (do_direct_)
138 : {
139 0 : close(fd_);
140 0 : fd_ = -1;
141 : }
142 : else
143 : {
144 0 : file_ptr_.reset(nullptr);
145 : }
146 0 : fstats_.recordFileClose();
147 0 : }
148 :
149 0 : bool art::BinaryFileOutput::readParameterSet_(fhicl::ParameterSet const& pset)
150 : {
151 0 : TLOG(TLVL_DEBUG + 32) << name_ << "BinaryFileOutput::readParameterSet_ method called with "
152 0 : << "ParameterSet = \"" << pset.to_string() << "\".";
153 : // determine the data sending parameters
154 : try
155 : {
156 0 : file_name_ = pset.get<std::string>("fileName");
157 : }
158 0 : catch (...)
159 : {
160 0 : TLOG(TLVL_ERROR) << name_ << "The fileName parameter was not specified "
161 0 : << "in the BinaryFileOutput initialization PSet: \"" << pset.to_string() << "\".";
162 0 : return false;
163 0 : }
164 0 : do_direct_ = pset.get<bool>("directIO", false);
165 : // determine the data sending parameters
166 0 : return true;
167 : }
168 :
169 : #define USE_STATIC_BUFFER 0
170 : #if USE_STATIC_BUFFER == 1
171 : static unsigned char static_buffer[0xa00000];
172 : #endif
173 :
174 0 : void art::BinaryFileOutput::write(EventPrincipal& ep)
175 : {
176 : using RawEvent = artdaq::Fragments;
177 : using RawEvents = std::vector<RawEvent>;
178 : using RawEventHandle = art::Handle<RawEvent>;
179 : using RawEventHandles = std::vector<RawEventHandle>;
180 :
181 0 : auto result_handles = std::vector<art::GroupQueryResult>();
182 0 : auto const& wrapped = art::WrappedTypeID::make<RawEvent>();
183 0 : ModuleContext const mc{moduleDescription()};
184 0 : ProcessTag const processTag{"", mc.moduleDescription().processName()};
185 :
186 0 : result_handles = ep.getMany(mc, wrapped, art::MatchAllSelector{}, processTag);
187 :
188 0 : for (auto const& result_handle : result_handles)
189 : {
190 0 : auto const raw_event_handle = RawEventHandle(result_handle);
191 :
192 0 : if (!raw_event_handle.isValid())
193 : {
194 0 : continue;
195 : }
196 :
197 0 : for (auto const& fragment : *raw_event_handle)
198 : {
199 0 : auto sequence_id = fragment.sequenceID();
200 0 : auto fragid_id = fragment.fragmentID();
201 0 : TLOG(TLVL_DEBUG + 33) << "BinaryFileOutput::write seq=" << sequence_id << " frag=" << fragid_id << " "
202 0 : << reinterpret_cast<const void*>(fragment.headerBeginBytes()) << " bytes=0x" << std::hex // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
203 0 : << fragment.sizeBytes() << " start";
204 0 : if (do_direct_)
205 : {
206 0 : ssize_t sts = ::write(fd_, reinterpret_cast<const char*>(fragment.headerBeginBytes()), fragment.sizeBytes()); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
207 0 : TLOG(TLVL_DEBUG + 34) << "BinaryFileOutput::write seq=" << sequence_id << " frag=" << fragid_id << " done sts=" << sts
208 0 : << " errno=" << errno;
209 : }
210 : else
211 : {
212 : #if USE_STATIC_BUFFER == 1
213 : file_ptr_->write((char*)static_buffer, fragment.sizeBytes());
214 : #else
215 0 : file_ptr_->write(reinterpret_cast<const char*>(fragment.headerBeginBytes()), fragment.sizeBytes()); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
216 : #endif
217 0 : TLOG(TLVL_DEBUG + 34) << "BinaryFileOutput::write seq=" << sequence_id << " frag=" << fragid_id << " done errno=" << errno;
218 : }
219 : }
220 0 : }
221 0 : fstats_.recordEvent(ep.eventID());
222 0 : }
223 :
224 0 : DEFINE_ART_MODULE(art::BinaryFileOutput) // NOLINT(performance-unnecessary-value-param)
|