Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME "ListenTransferWrapper"
3 :
4 : #include "artdaq-core/Data/Fragment.hh"
5 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
6 : #include "artdaq-core/Utilities/TimeUtils.hh"
7 : #include "artdaq/ArtModules/detail/ListenTransferWrapper.hh"
8 : #include "artdaq/DAQdata/NetMonHeader.hh"
9 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
10 :
11 : #include "cetlib/BasicPluginFactory.h"
12 : #include "cetlib_except/exception.h"
13 : #include "fhiclcpp/ParameterSet.h"
14 :
15 : #include <csignal>
16 : #include <iostream>
17 : #include <limits>
18 : #include <memory>
19 : #include <sstream>
20 : #include <string>
21 :
22 : namespace {
23 : volatile std::sig_atomic_t gListenSignalStatus = 0; ///< Stores singal from signal handler
24 : }
25 :
26 : /**
27 : * \brief Handle a Unix signal
28 : * \param signal Signal to handle
29 : */
30 0 : void listen_signal_handler(int signal)
31 : {
32 0 : gListenSignalStatus = signal;
33 0 : }
34 :
35 0 : artdaq::ListenTransferWrapper::ListenTransferWrapper(const fhicl::ParameterSet& pset)
36 0 : : timeoutInUsecs_(pset.get<std::size_t>("timeoutInUsecs", 100000))
37 0 : , last_received_data_()
38 0 : , last_report_(std::chrono::steady_clock::now())
39 0 : , transfer_(nullptr)
40 0 : , pset_(pset)
41 0 : , maxEventsBeforeInit_(pset.get<std::size_t>("maxEventsBeforeInit", 5))
42 0 : , allowedFragmentTypes_(pset.get<std::vector<int>>("allowedFragmentTypes", {226, 227, 229}))
43 0 : , runningStateTimeout_(pset.get<double>("dispatcherConnectTimeout", 0))
44 0 : , runningStateInterval_us_(pset.get<size_t>("dispatcherConnectRetryInterval_us", 1000000))
45 0 : , quitOnFragmentIntegrityProblem_(pset.get<bool>("quitOnFragmentIntegrityProblem", true))
46 0 : , multi_run_mode_(pset.get<bool>("allowMultipleRuns", true))
47 : {
48 0 : std::signal(SIGINT, listen_signal_handler);
49 :
50 : try
51 : {
52 0 : if (metricMan)
53 : {
54 0 : metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "Online Monitor");
55 0 : metricMan->do_start();
56 : }
57 : }
58 0 : catch (...)
59 : {
60 0 : artdaq::ExceptionHandler(
61 : artdaq::ExceptionHandlerRethrow::no,
62 : "ListenTransferWrapper: could not configure metrics");
63 0 : }
64 :
65 : try
66 : {
67 0 : transfer_ = artdaq::MakeTransferPlugin(pset_, "transfer_plugin", artdaq::TransferInterface::Role::kReceive);
68 : }
69 0 : catch (...)
70 : {
71 0 : artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, "ListenTransferWrapper: failure in call to MakeTransferPlugin");
72 0 : }
73 :
74 : // Clamp possible values
75 0 : if (runningStateInterval_us_ < 1000)
76 : {
77 0 : TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 1000 us";
78 0 : runningStateInterval_us_ = 1000;
79 : }
80 0 : if (runningStateInterval_us_ > 30000000)
81 : {
82 0 : TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 30,000,000 us";
83 0 : runningStateInterval_us_ = 30000000;
84 : }
85 0 : }
86 :
87 0 : artdaq::FragmentPtrs artdaq::ListenTransferWrapper::receiveMessage()
88 : {
89 0 : artdaq::FragmentPtrs fragmentPtrs;
90 0 : bool receivedFragment = false;
91 : static bool initialized = false;
92 : static size_t fragments_received = 0;
93 :
94 0 : while (gListenSignalStatus == 0)
95 : {
96 0 : receivedFragment = false;
97 0 : auto fragmentPtr = std::make_unique<artdaq::Fragment>();
98 :
99 0 : while (!receivedFragment)
100 : {
101 0 : if (gListenSignalStatus != 0)
102 : {
103 0 : TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
104 0 : return fragmentPtrs;
105 : }
106 :
107 : try
108 : {
109 0 : auto result = transfer_->receiveFragment(*fragmentPtr, timeoutInUsecs_);
110 :
111 0 : if (result >= artdaq::TransferInterface::RECV_SUCCESS)
112 : {
113 0 : receivedFragment = true;
114 0 : fragments_received++;
115 :
116 : static size_t cntr = 0;
117 0 : auto mod = ++cntr % 10;
118 0 : auto suffix = "-th";
119 0 : if (mod == 1)
120 : {
121 0 : suffix = "-st";
122 : }
123 0 : if (mod == 2)
124 : {
125 0 : suffix = "-nd";
126 : }
127 0 : if (mod == 3)
128 : {
129 0 : suffix = "-rd";
130 : }
131 0 : TLOG(TLVL_INFO) << "Received " << cntr << suffix << " event, "
132 0 : << "seqID == " << fragmentPtr->sequenceID()
133 0 : << ", type == " << fragmentPtr->typeString();
134 0 : last_received_data_ = std::chrono::steady_clock::now();
135 0 : continue;
136 0 : }
137 0 : if (result == artdaq::TransferInterface::DATA_END)
138 : {
139 0 : TLOG(TLVL_ERROR) << "Transfer Plugin disconnected or other unrecoverable error. Shutting down.";
140 0 : if (multi_run_mode_)
141 : {
142 0 : initialized = false;
143 0 : continue;
144 : }
145 0 : return fragmentPtrs;
146 : }
147 : else
148 : {
149 0 : auto tlvl = TLVL_DEBUG + 33;
150 0 : if (artdaq::TimeUtils::GetElapsedTime(last_report_) > 1.0 && artdaq::TimeUtils::GetElapsedTime(last_received_data_) > 1.0)
151 : {
152 0 : tlvl = TLVL_WARNING;
153 0 : last_report_ = std::chrono::steady_clock::now();
154 : }
155 :
156 0 : auto last_received_milliseconds = artdaq::TimeUtils::GetElapsedTimeMilliseconds(last_received_data_);
157 :
158 : // 02-Jun-2018, KAB: added status/result printout
159 : // to-do: add another else clause that explicitly checks for RECV_TIMEOUT
160 0 : TLOG(tlvl) << "Timeout occurred in call to transfer_->receiveFragmentFrom; will try again"
161 0 : << ", status = " << result << ", last received data " << last_received_milliseconds << " ms ago.";
162 : }
163 : }
164 0 : catch (...)
165 : {
166 0 : artdaq::ExceptionHandler(
167 : artdaq::ExceptionHandlerRethrow::yes,
168 : "Problem receiving data in ListenTransferWrapper::receiveMessage");
169 0 : }
170 : }
171 :
172 0 : if (fragmentPtr->type() == artdaq::Fragment::EndOfSubrunFragmentType || fragmentPtr->type() == artdaq::Fragment::EndOfRunFragmentType)
173 : {
174 : // Ignore these for now
175 0 : continue;
176 : }
177 :
178 0 : if (fragmentPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
179 : {
180 : // if (monitorRegistered_)
181 : //{
182 : // unregisterMonitor();
183 : // }
184 0 : if (multi_run_mode_)
185 : {
186 : // initialized = false;
187 0 : continue;
188 : }
189 :
190 0 : return fragmentPtrs;
191 : }
192 :
193 0 : checkIntegrity(*fragmentPtr);
194 :
195 0 : if (initialized || fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
196 : {
197 0 : if (initialized && fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
198 : {
199 : // Ignore reinit for now, maybe handle in ArtdaqInputHelper later
200 0 : continue;
201 : }
202 0 : initialized = true;
203 0 : fragmentPtrs.push_back(std::move(fragmentPtr));
204 0 : break;
205 : }
206 :
207 0 : if (fragments_received > maxEventsBeforeInit_)
208 : {
209 0 : throw cet::exception("ListenTransferWrapper") << "First " << maxEventsBeforeInit_ << " events received did not include the \"Init\" event containing necessary info for art; exiting..."; // NOLINT(cert-err60-cpp)
210 : }
211 0 : }
212 :
213 0 : return fragmentPtrs;
214 0 : }
215 :
216 : std::shared_ptr<ArtdaqEvent>
217 0 : artdaq::ListenTransferWrapper::receiveMessages()
218 : {
219 0 : auto output = std::make_shared<ArtdaqEvent>();
220 :
221 0 : auto ptrs = receiveMessage();
222 0 : for (auto& ptr : ptrs)
223 : {
224 0 : auto fragType = ptr->type();
225 0 : auto fragPtr = ptr.release();
226 0 : ptr.reset(nullptr);
227 :
228 0 : if (output->fragments.count(fragType) == 0u)
229 : {
230 0 : output->fragments[fragType] = std::make_unique<artdaq::Fragments>();
231 : }
232 :
233 0 : output->fragments[fragType]->emplace_back(std::move(*fragPtr));
234 : }
235 :
236 0 : return output;
237 0 : }
238 :
239 0 : void artdaq::ListenTransferWrapper::checkIntegrity(
240 : const artdaq::Fragment& fragment) const
241 : {
242 0 : const size_t artdaqheader = artdaq::detail::RawFragmentHeader::num_words() *
243 : sizeof(artdaq::detail::RawFragmentHeader::RawDataType);
244 0 : const auto payload = static_cast<size_t>(fragment.dataEndBytes() - fragment.dataBeginBytes());
245 0 : const size_t metadata = sizeof(artdaq::NetMonHeader);
246 0 : const size_t totalsize = fragment.sizeBytes();
247 :
248 0 : const auto type = static_cast<size_t>(fragment.type());
249 :
250 0 : if (totalsize != artdaqheader + metadata + payload)
251 : {
252 0 : std::stringstream errmsg;
253 0 : errmsg << "Error: artdaq fragment of type " << fragment.typeString() << ", sequence ID " << fragment.sequenceID() << " has internally inconsistent measures of its size, signalling data corruption: in bytes,"
254 0 : << " total size = " << totalsize << ", artdaq fragment header = " << artdaqheader << ", metadata = " << metadata << ", payload = " << payload;
255 :
256 0 : TLOG(TLVL_ERROR) << errmsg.str();
257 :
258 0 : if (quitOnFragmentIntegrityProblem_)
259 : {
260 0 : throw cet::exception("ListenTransferWrapper") << errmsg.str(); // NOLINT(cert-err60-cpp)
261 : }
262 :
263 0 : return;
264 0 : }
265 :
266 0 : auto findloc = std::find(allowedFragmentTypes_.begin(), allowedFragmentTypes_.end(), static_cast<int>(type));
267 :
268 0 : if (findloc == allowedFragmentTypes_.end())
269 : {
270 0 : std::stringstream errmsg;
271 0 : errmsg << "Error: artdaq fragment appears to have type "
272 0 : << type << ", not found in the allowed fragment types list";
273 :
274 0 : TLOG(TLVL_ERROR) << errmsg.str();
275 0 : if (quitOnFragmentIntegrityProblem_)
276 : {
277 0 : throw cet::exception("ListenTransferWrapper") << errmsg.str(); // NOLINT(cert-err60-cpp)
278 : }
279 :
280 0 : return;
281 0 : }
282 : }
283 :
284 0 : artdaq::ListenTransferWrapper::~ListenTransferWrapper()
285 : {
286 0 : artdaq::Globals::CleanUpGlobals();
287 0 : }
|