Line data Source code
1 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
2 : #include "artdaq-core/Data/Fragment.hh"
3 :
4 : #define TRACE_NAME "TransferWrapper"
5 :
6 : #include "artdaq/ArtModules/detail/TransferWrapper.hh"
7 :
8 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
9 : #include "artdaq-core/Utilities/TimeUtils.hh"
10 : #include "artdaq/DAQdata/NetMonHeader.hh"
11 : #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
12 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
13 :
14 : #include "cetlib_except/exception.h"
15 : #include "fhiclcpp/ParameterSet.h"
16 :
17 : #include <csignal>
18 : #include <iostream>
19 : #include <limits>
20 : #include <memory>
21 : #include <sstream>
22 : #include <string>
23 :
24 : namespace {
25 : volatile std::sig_atomic_t gSignalStatus = 0; ///< Stores singal from signal handler
26 : }
27 :
28 : /**
29 : * \brief Handle a Unix signal
30 : * \param signal Signal to handle
31 : */
32 0 : void signal_handler(int signal)
33 : {
34 0 : gSignalStatus = signal;
35 0 : }
36 :
37 0 : artdaq::TransferWrapper::TransferWrapper(const fhicl::ParameterSet& pset)
38 0 : : timeoutInUsecs_(pset.get<std::size_t>("timeoutInUsecs", 100000))
39 0 : , last_received_data_()
40 0 : , last_report_(std::chrono::steady_clock::now())
41 0 : , transfer_(nullptr)
42 0 : , commander_(nullptr)
43 0 : , pset_(pset)
44 0 : , dispatcherHost_(pset.get<std::string>("dispatcherHost", "localhost"))
45 0 : , dispatcherPort_(pset.get<std::string>("dispatcherPort", "5266"))
46 0 : , serverUrl_(pset.get<std::string>("server_url", "http://" + dispatcherHost_ + ":" + dispatcherPort_ + "/RPC2"))
47 0 : , maxEventsBeforeInit_(pset.get<std::size_t>("maxEventsBeforeInit", 5))
48 0 : , allowedFragmentTypes_(pset.get<std::vector<int>>("allowedFragmentTypes", {static_cast<int>(artdaq::Fragment::DataFragmentType),
49 : static_cast<int>(artdaq::Fragment::InitFragmentType),
50 : static_cast<int>(artdaq::Fragment::EndOfRunFragmentType),
51 : static_cast<int>(artdaq::Fragment::EndOfSubrunFragmentType),
52 : static_cast<int>(artdaq::Fragment::RunDataFragmentType),
53 : static_cast<int>(artdaq::Fragment::SubrunDataFragmentType)}))
54 0 : , runningStateTimeout_(pset.get<double>("dispatcherConnectTimeout", 0))
55 0 : , runningStateInterval_us_(pset.get<size_t>("dispatcherConnectRetryInterval_us", 1000000))
56 0 : , quitOnFragmentIntegrityProblem_(pset.get<bool>("quitOnFragmentIntegrityProblem", true))
57 0 : , multi_run_mode_(pset.get<bool>("multi_run_mode", false))
58 0 : , monitorRegistered_(false)
59 : {
60 0 : std::signal(SIGINT, signal_handler);
61 :
62 : try
63 : {
64 0 : if (metricMan)
65 : {
66 0 : metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "Online Monitor");
67 0 : metricMan->do_start();
68 : }
69 : }
70 0 : catch (...)
71 : {
72 0 : ExceptionHandler(ExceptionHandlerRethrow::no, "TransferWrapper: could not configure metrics");
73 0 : }
74 :
75 : // Clamp possible values
76 0 : if (runningStateInterval_us_ < 1000)
77 : {
78 0 : TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 1000 us";
79 0 : runningStateInterval_us_ = 1000;
80 : }
81 0 : if (runningStateInterval_us_ > 30000000)
82 : {
83 0 : TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 30,000,000 us";
84 0 : runningStateInterval_us_ = 30000000;
85 : }
86 :
87 0 : fhicl::ParameterSet new_pset(pset);
88 0 : if (!new_pset.has_key("server_url"))
89 : {
90 0 : new_pset.put<std::string>("server_url", serverUrl_);
91 : }
92 :
93 0 : artdaq::Commandable c;
94 0 : commander_ = MakeCommanderPlugin(new_pset, c);
95 0 : }
96 :
97 0 : artdaq::FragmentPtrs artdaq::TransferWrapper::receiveMessage()
98 : {
99 0 : artdaq::FragmentPtrs fragmentPtrs;
100 0 : bool receivedFragment = false;
101 : static bool initialized = false;
102 : static size_t fragments_received = 0;
103 :
104 0 : while (gSignalStatus == 0)
105 : {
106 0 : receivedFragment = false;
107 0 : auto fragmentPtr = std::make_unique<artdaq::Fragment>();
108 :
109 0 : while (!receivedFragment)
110 : {
111 0 : if (gSignalStatus != 0)
112 : {
113 0 : TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
114 0 : return fragmentPtrs;
115 : }
116 0 : if (!monitorRegistered_)
117 : {
118 0 : registerMonitor();
119 0 : if (!monitorRegistered_)
120 : {
121 0 : return fragmentPtrs;
122 : }
123 : }
124 :
125 : try
126 : {
127 0 : auto result = transfer_->receiveFragment(*fragmentPtr, timeoutInUsecs_);
128 :
129 0 : if (result >= artdaq::TransferInterface::RECV_SUCCESS)
130 : {
131 0 : receivedFragment = true;
132 0 : fragments_received++;
133 :
134 : static size_t cntr = 0;
135 0 : auto mod = ++cntr % 10;
136 0 : auto suffix = "-th";
137 0 : if (mod == 1)
138 : {
139 0 : suffix = "-st";
140 : }
141 0 : if (mod == 2)
142 : {
143 0 : suffix = "-nd";
144 : }
145 0 : if (mod == 3)
146 : {
147 0 : suffix = "-rd";
148 : }
149 0 : TLOG(TLVL_INFO) << "Received " << cntr << suffix << " event, "
150 0 : << "seqID == " << fragmentPtr->sequenceID()
151 0 : << ", type == " << fragmentPtr->typeString();
152 0 : last_received_data_ = std::chrono::steady_clock::now();
153 0 : continue;
154 0 : }
155 0 : if (result == artdaq::TransferInterface::DATA_END)
156 : {
157 0 : TLOG(TLVL_ERROR) << "Transfer Plugin disconnected or other unrecoverable error. Shutting down.";
158 0 : return fragmentPtrs;
159 : }
160 : else
161 : {
162 0 : auto tlvl = TLVL_DEBUG + 33;
163 0 : if (artdaq::TimeUtils::GetElapsedTime(last_report_) > 1.0 && artdaq::TimeUtils::GetElapsedTime(last_received_data_) > 1.0)
164 : {
165 0 : tlvl = TLVL_WARNING;
166 0 : last_report_ = std::chrono::steady_clock::now();
167 : }
168 :
169 0 : auto last_received_milliseconds = artdaq::TimeUtils::GetElapsedTimeMilliseconds(last_received_data_);
170 :
171 : // 02-Jun-2018, KAB: added status/result printout
172 : // to-do: add another else clause that explicitly checks for RECV_TIMEOUT
173 0 : TLOG(tlvl) << "Timeout occurred in call to transfer_->receiveFragmentFrom; will try again"
174 0 : << ", status = " << result << ", last received data " << last_received_milliseconds << " ms ago.";
175 : }
176 : }
177 0 : catch (...)
178 : {
179 0 : ExceptionHandler(ExceptionHandlerRethrow::yes,
180 : "Problem receiving data in TransferWrapper::receiveMessage");
181 0 : }
182 : }
183 :
184 0 : if (fragmentPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
185 : {
186 0 : if (!multi_run_mode_)
187 : {
188 0 : TLOG(TLVL_DEBUG + 32) << "Received shutdown message, returning";
189 0 : fragmentPtrs.push_back(std::move(fragmentPtr));
190 : }
191 0 : return fragmentPtrs;
192 : }
193 :
194 0 : checkIntegrity(*fragmentPtr);
195 :
196 0 : if (initialized || fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
197 : {
198 0 : initialized = true;
199 0 : fragmentPtrs.push_back(std::move(fragmentPtr));
200 0 : break;
201 : }
202 :
203 0 : if (fragments_received > maxEventsBeforeInit_)
204 : {
205 0 : throw cet::exception("TransferWrapper") << "First " << maxEventsBeforeInit_ << " events received did not include the \"Init\" event containing necessary info for art; exiting..."; // NOLINT(cert-err60-cpp)
206 : }
207 0 : }
208 :
209 0 : return fragmentPtrs;
210 0 : }
211 :
212 0 : std::shared_ptr<ArtdaqEvent> artdaq::TransferWrapper::receiveMessages()
213 : {
214 0 : std::shared_ptr<ArtdaqEvent> output = std::make_shared<ArtdaqEvent>();
215 :
216 0 : auto ptrs = receiveMessage();
217 0 : for (auto& ptr : ptrs)
218 : {
219 0 : auto fragType = ptr->type();
220 0 : auto fragPtr = ptr.release();
221 0 : ptr.reset(nullptr);
222 :
223 0 : if (output->fragments.count(fragType) == 0u)
224 : {
225 0 : output->fragments[fragType] = std::make_unique<artdaq::Fragments>();
226 : }
227 :
228 0 : output->fragments[fragType]->emplace_back(std::move(*fragPtr));
229 : }
230 :
231 0 : return output;
232 0 : }
233 :
234 0 : void artdaq::TransferWrapper::checkIntegrity(const artdaq::Fragment& fragment) const
235 : {
236 0 : const size_t artdaqheader = artdaq::detail::RawFragmentHeader::num_words() *
237 : sizeof(artdaq::detail::RawFragmentHeader::RawDataType);
238 0 : const auto payload = static_cast<size_t>(fragment.dataEndBytes() - fragment.dataBeginBytes());
239 0 : const size_t metadata = sizeof(artdaq::NetMonHeader);
240 0 : const size_t totalsize = fragment.sizeBytes();
241 :
242 0 : const auto type = static_cast<size_t>(fragment.type());
243 :
244 0 : if (totalsize != artdaqheader + metadata + payload)
245 : {
246 0 : std::stringstream errmsg;
247 0 : errmsg << "Error: artdaq fragment of type " << fragment.typeString() << ", sequence ID " << fragment.sequenceID() << " has internally inconsistent measures of its size, signalling data corruption: in bytes,"
248 0 : << " total size = " << totalsize << ", artdaq fragment header = " << artdaqheader << ", metadata = " << metadata << ", payload = " << payload;
249 :
250 0 : TLOG(TLVL_ERROR) << errmsg.str();
251 :
252 0 : if (quitOnFragmentIntegrityProblem_)
253 : {
254 0 : throw cet::exception("TransferWrapper") << errmsg.str(); // NOLINT(cert-err60-cpp)
255 : }
256 :
257 0 : return;
258 0 : }
259 :
260 0 : auto findloc = std::find(allowedFragmentTypes_.begin(), allowedFragmentTypes_.end(), static_cast<int>(type));
261 :
262 0 : if (findloc == allowedFragmentTypes_.end())
263 : {
264 0 : std::stringstream errmsg;
265 0 : errmsg << "Error: artdaq fragment appears to have type "
266 0 : << type << ", not found in the allowed fragment types list";
267 :
268 0 : TLOG(TLVL_ERROR) << errmsg.str();
269 0 : if (quitOnFragmentIntegrityProblem_)
270 : {
271 0 : throw cet::exception("TransferWrapper") << errmsg.str(); // NOLINT(cert-err60-cpp)
272 : }
273 :
274 0 : return;
275 0 : }
276 : }
277 :
278 0 : void artdaq::TransferWrapper::registerMonitor()
279 : {
280 : try
281 : {
282 0 : transfer_.reset(nullptr);
283 0 : transfer_ = MakeTransferPlugin(pset_, "transfer_plugin", TransferInterface::Role::kReceive);
284 : }
285 0 : catch (...)
286 : {
287 0 : ExceptionHandler(ExceptionHandlerRethrow::yes,
288 : "TransferWrapper: failure in call to MakeTransferPlugin");
289 0 : }
290 :
291 0 : auto start = std::chrono::steady_clock::now();
292 0 : auto sts = getDispatcherStatus();
293 0 : while (sts != "Running" && (runningStateTimeout_ == 0 || TimeUtils::GetElapsedTime(start) < runningStateTimeout_))
294 : {
295 0 : TLOG(TLVL_DEBUG + 32) << "Dispatcher state: " << sts;
296 0 : if (gSignalStatus != 0)
297 : {
298 0 : TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
299 0 : return;
300 : }
301 0 : TLOG(TLVL_INFO) << "Waited " << std::fixed << std::setprecision(2) << TimeUtils::GetElapsedTime(start) << " s / " << runningStateTimeout_ << " s for Dispatcher to enter the Running state (state=" << sts << ")";
302 0 : usleep(runningStateInterval_us_);
303 0 : sts = getDispatcherStatus();
304 : }
305 0 : if (sts != "Running")
306 : {
307 0 : return;
308 : }
309 :
310 0 : auto dispatcherConfig = pset_.get<fhicl::ParameterSet>("dispatcher_config");
311 :
312 0 : int retry = 3;
313 :
314 0 : while (retry > 0)
315 : {
316 0 : label_ = dispatcherConfig.get<std::string>("unique_label") + "T" + std::to_string(time(0));
317 0 : dispatcherConfig.erase("unique_label");
318 0 : dispatcherConfig.put<std::string>("unique_label", label_);
319 0 : TLOG(TLVL_INFO) << "Attempting to register this monitor (\"" << label_
320 0 : << "\") with the dispatcher aggregator";
321 :
322 0 : auto status = commander_->send_register_monitor(dispatcherConfig.to_string());
323 :
324 0 : TLOG(TLVL_INFO) << "Response from dispatcher is \"" << status << "\"";
325 :
326 0 : if (status == "Success")
327 : {
328 0 : monitorRegistered_ = true;
329 0 : break;
330 : }
331 :
332 0 : TLOG(TLVL_WARNING) << "Error in TransferWrapper: attempt to register with dispatcher did not result in the \"Success\" response";
333 0 : usleep(100000);
334 :
335 0 : retry--;
336 0 : }
337 0 : }
338 :
339 0 : void artdaq::TransferWrapper::unregisterMonitor()
340 : {
341 0 : if (!monitorRegistered_)
342 : {
343 0 : TLOG(TLVL_WARNING) << "The function to unregister the monitor was called, but the monitor doesn't appear to be registered";
344 0 : return;
345 : }
346 :
347 0 : auto start_time = std::chrono::steady_clock::now();
348 0 : bool waiting = true;
349 0 : while (artdaq::TimeUtils::GetElapsedTime(start_time) < 5.0 && waiting)
350 : {
351 0 : std::string sts = getDispatcherStatus();
352 :
353 0 : if (sts.empty())
354 0 : return;
355 :
356 0 : if (sts == "busy")
357 : {
358 0 : TLOG(TLVL_INFO) << "The Dispatcher returned \"busy\", will wait 0.5s and retry";
359 0 : usleep(500000);
360 0 : continue;
361 0 : }
362 :
363 0 : if (sts != "Running" && sts != "Ready")
364 : {
365 0 : TLOG(TLVL_WARNING) << "The Dispatcher is not in the Running or Ready state, will not attempt to unregister (state: " << sts << ")";
366 0 : return;
367 : }
368 0 : waiting = false;
369 0 : }
370 0 : if (waiting)
371 : {
372 0 : TLOG(TLVL_WARNING) << "A timeout occurred waiting for the Dispatcher to leave the \"busy\" state, will not attempt to unregister";
373 0 : return;
374 : }
375 :
376 0 : int retry = 3;
377 0 : while (retry > 0)
378 : {
379 0 : TLOG(TLVL_INFO) << "Requesting that this monitor (" << label_
380 0 : << ") be unregistered from the dispatcher aggregator";
381 :
382 0 : auto status = commander_->send_unregister_monitor(label_);
383 :
384 0 : TLOG(TLVL_INFO) << "Response from dispatcher is \"" << status << "\"";
385 :
386 0 : if (status == "Success")
387 : {
388 0 : break;
389 : }
390 0 : else if (status == "busy")
391 : {
392 0 : TLOG(TLVL_DEBUG + 32) << "The Dispatcher returned \"busy\", will retry in 0.5s";
393 : }
394 : else
395 : {
396 0 : TLOG(TLVL_WARNING) << "The Dispatcher returned status " << status << " when attempting to unregister this monitor!";
397 : // throw cet::exception("TransferWrapper") << "Error in TransferWrapper: attempt to unregister with dispatcher did not result in the \"Success\" response";
398 : }
399 0 : retry--;
400 0 : usleep(500000);
401 0 : }
402 :
403 0 : TLOG(TLVL_INFO) << "Successfully unregistered the monitor from the Dispatcher";
404 0 : monitorRegistered_ = false;
405 : }
406 :
407 0 : std::string artdaq::TransferWrapper::getDispatcherStatus()
408 : {
409 : try
410 : {
411 0 : return commander_->send_status();
412 : }
413 0 : catch (std::exception const& ex)
414 : {
415 0 : TLOG(TLVL_WARNING) << "An exception was thrown trying to collect the Dispatcher's status. Most likely cause is the application is no longer running.";
416 0 : return "";
417 0 : }
418 : }
419 :
420 0 : artdaq::TransferWrapper::~TransferWrapper()
421 : {
422 0 : if (monitorRegistered_)
423 : {
424 : try
425 : {
426 0 : unregisterMonitor();
427 : }
428 0 : catch (...)
429 : {
430 0 : ExceptionHandler(ExceptionHandlerRethrow::no,
431 : "An exception occurred when trying to unregister monitor during TransferWrapper's destruction");
432 0 : }
433 : }
434 0 : artdaq::Globals::CleanUpGlobals();
435 0 : }
|