Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #include "artdaq/DAQdata/Globals.hh" // include these 2 first to get tracemf.h -
3 : #define TRACE_NAME (app_name + "_RoutingManagerCore").c_str() // before trace.h
4 :
5 : #include "artdaq/Application/RoutingManagerCore.hh"
6 :
7 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
8 : #include "artdaq/DAQdata/TCP_listen_fd.hh"
9 : #include "artdaq/RoutingPolicies/makeRoutingManagerPolicy.hh"
10 :
11 : #include "fhiclcpp/ParameterSet.h"
12 :
13 : #include <arpa/inet.h>
14 : #include <netdb.h>
15 : #include <pthread.h>
16 : #include <sched.h>
17 : #include <sys/time.h>
18 : #include <sys/un.h>
19 : #include <algorithm>
20 : #include <memory>
21 :
22 : const std::string artdaq::RoutingManagerCore::
23 : TABLE_UPDATES_STAT_KEY("RoutingManagerCoreTableUpdates");
24 : const std::string artdaq::RoutingManagerCore::
25 : TOKENS_RECEIVED_STAT_KEY("RoutingManagerCoreTokensReceived");
26 : const std::string artdaq::RoutingManagerCore::
27 : CURRENT_TABLE_INTERVAL_STAT_KEY("RoutingManagerCoreCurrentTableInterval");
28 :
29 0 : artdaq::RoutingManagerCore::RoutingManagerCore()
30 0 : : shutdown_requested_(false)
31 0 : , stop_requested_(true)
32 0 : , pause_requested_(false)
33 0 : , statsHelperPtr_(new artdaq::StatisticsHelper())
34 : {
35 0 : TLOG(TLVL_DEBUG + 32) << "Constructor";
36 0 : statsHelperPtr_->addMonitoredQuantityName(TABLE_UPDATES_STAT_KEY);
37 0 : statsHelperPtr_->addMonitoredQuantityName(TOKENS_RECEIVED_STAT_KEY);
38 0 : statsHelperPtr_->addMonitoredQuantityName(CURRENT_TABLE_INTERVAL_STAT_KEY);
39 0 : }
40 :
41 0 : artdaq::RoutingManagerCore::~RoutingManagerCore()
42 : {
43 0 : TLOG(TLVL_DEBUG + 32) << "Destructor";
44 0 : artdaq::StatisticsCollection::getInstance().requestStop();
45 0 : token_receiver_->stopTokenReception(true);
46 0 : }
47 :
48 0 : bool artdaq::RoutingManagerCore::initialize(fhicl::ParameterSet const& pset, uint64_t /*unused*/, uint64_t /*unused*/)
49 : {
50 0 : TLOG(TLVL_DEBUG + 32) << "initialize method called with "
51 0 : << "ParameterSet = \"" << pset.to_string()
52 0 : << "\".";
53 :
54 : // pull out the relevant parts of the ParameterSet
55 0 : fhicl::ParameterSet daq_pset;
56 : try
57 : {
58 0 : daq_pset = pset.get<fhicl::ParameterSet>("daq");
59 : }
60 0 : catch (...)
61 : {
62 0 : TLOG(TLVL_ERROR)
63 0 : << "Unable to find the DAQ parameters in the initialization "
64 0 : << "ParameterSet: \"" + pset.to_string() + "\".";
65 0 : return false;
66 0 : }
67 :
68 0 : if (daq_pset.has_key("rank"))
69 : {
70 0 : if (my_rank >= 0 && daq_pset.get<int>("rank") != my_rank)
71 : {
72 0 : TLOG(TLVL_WARNING) << "Routing Manager rank specified at startup is different than rank specified at configure! Using rank received at configure!";
73 : }
74 0 : my_rank = daq_pset.get<int>("rank");
75 : }
76 0 : if (my_rank == -1)
77 : {
78 0 : TLOG(TLVL_ERROR) << "Routing Manager rank not specified at startup or in configuration! Aborting";
79 0 : exit(1);
80 : }
81 :
82 : try
83 : {
84 0 : policy_pset_ = daq_pset.get<fhicl::ParameterSet>("policy");
85 : }
86 0 : catch (...)
87 : {
88 0 : TLOG(TLVL_ERROR)
89 0 : << "Unable to find the policy parameters in the DAQ initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
90 0 : return false;
91 0 : }
92 :
93 : try
94 : {
95 0 : token_receiver_pset_ = daq_pset.get<fhicl::ParameterSet>("token_receiver");
96 : }
97 0 : catch (...)
98 : {
99 0 : TLOG(TLVL_ERROR)
100 0 : << "Unable to find the token_receiver parameters in the DAQ initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
101 0 : return false;
102 0 : }
103 :
104 : // pull out the Metric part of the ParameterSet
105 0 : fhicl::ParameterSet metric_pset;
106 : try
107 : {
108 0 : metric_pset = daq_pset.get<fhicl::ParameterSet>("metrics");
109 : }
110 0 : catch (...)
111 0 : {} // OK if there's no metrics table defined in the FHiCL
112 :
113 0 : if (metric_pset.is_empty())
114 : {
115 0 : TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
116 : }
117 : try
118 : {
119 0 : metricMan->initialize(metric_pset, app_name);
120 : }
121 0 : catch (...)
122 : {
123 0 : ExceptionHandler(ExceptionHandlerRethrow::no,
124 : "Error loading metrics in RoutingManagerCore::initialize()");
125 0 : }
126 :
127 : // create the requested RoutingPolicy
128 0 : auto policy_plugin_spec = policy_pset_.get<std::string>("policy", "");
129 0 : if (policy_plugin_spec.length() == 0)
130 : {
131 0 : TLOG(TLVL_ERROR)
132 0 : << "No fragment generator (parameter name = \"policy\") was "
133 0 : << "specified in the policy ParameterSet. The "
134 0 : << "DAQ initialization PSet was \"" << daq_pset.to_string() << "\".";
135 0 : return false;
136 : }
137 : try
138 : {
139 0 : policy_ = artdaq::makeRoutingManagerPolicy(policy_plugin_spec, policy_pset_);
140 : }
141 0 : catch (...)
142 : {
143 0 : std::stringstream exception_string;
144 : exception_string << "Exception thrown during initialization of policy of type \""
145 0 : << policy_plugin_spec << "\"";
146 :
147 0 : ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
148 :
149 0 : TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the policy which threw an exception: " << policy_pset_.to_string();
150 :
151 0 : return false;
152 0 : }
153 :
154 0 : rt_priority_ = daq_pset.get<int>("rt_priority", 0);
155 0 : max_table_update_interval_ms_ = daq_pset.get<size_t>("table_update_interval_ms", 1000);
156 0 : current_table_interval_ms_ = max_table_update_interval_ms_;
157 0 : table_update_high_fraction_ = daq_pset.get<double>("table_update_interval_high_frac", 0.75);
158 0 : table_update_low_fraction_ = daq_pset.get<double>("table_update_interval_low_frac", 0.5);
159 :
160 : // fetch the monitoring parameters and create the MonitoredQuantity instances
161 0 : statsHelperPtr_->createCollectors(daq_pset, 100, 30.0, 60.0, TABLE_UPDATES_STAT_KEY);
162 :
163 : // create the requested TokenReceiver
164 0 : token_receiver_ = std::make_unique<TokenReceiver>(token_receiver_pset_, policy_, max_table_update_interval_ms_);
165 0 : token_receiver_->setStatsHelper(statsHelperPtr_, TOKENS_RECEIVED_STAT_KEY);
166 0 : token_receiver_->startTokenReception();
167 0 : token_receiver_->pauseTokenReception();
168 :
169 0 : table_listen_port_ = daq_pset.get<int>("table_update_port", 35556);
170 :
171 0 : shutdown_requested_.store(true);
172 0 : if (listen_thread_ && listen_thread_->joinable())
173 : {
174 0 : listen_thread_->join();
175 : }
176 0 : shutdown_requested_.store(false);
177 0 : TLOG(TLVL_INFO) << "Starting Listener Thread";
178 :
179 : try
180 : {
181 0 : listen_thread_ = std::make_unique<boost::thread>(&RoutingManagerCore::listen_, this);
182 : }
183 0 : catch (const boost::exception& e)
184 : {
185 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
186 0 : std::cerr << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
187 0 : exit(5);
188 0 : }
189 0 : return true;
190 0 : }
191 :
192 0 : bool artdaq::RoutingManagerCore::start(art::RunID id, uint64_t /*unused*/, uint64_t /*unused*/)
193 : {
194 0 : run_id_ = id;
195 0 : stop_requested_.store(false);
196 0 : pause_requested_.store(false);
197 :
198 0 : statsHelperPtr_->resetStatistics();
199 :
200 0 : metricMan->do_start();
201 0 : table_update_count_ = 0;
202 0 : token_receiver_->setRunNumber(run_id_.run());
203 0 : token_receiver_->resumeTokenReception();
204 :
205 0 : TLOG(TLVL_INFO) << "Started run " << run_id_.run();
206 0 : return true;
207 : }
208 :
209 0 : bool artdaq::RoutingManagerCore::stop(uint64_t /*unused*/, uint64_t /*unused*/)
210 : {
211 0 : TLOG(TLVL_INFO) << "Stopping run " << run_id_.run()
212 0 : << " after " << table_update_count_ << " table updates."
213 0 : << " and " << token_receiver_->getReceivedTokenCount() << " received tokens.";
214 0 : stop_requested_.store(true);
215 0 : token_receiver_->pauseTokenReception();
216 0 : run_id_ = art::RunID::flushRun();
217 0 : return true;
218 : }
219 :
220 0 : bool artdaq::RoutingManagerCore::pause(uint64_t /*unused*/, uint64_t /*unused*/)
221 : {
222 0 : TLOG(TLVL_INFO) << "Pausing run " << run_id_.run()
223 0 : << " after " << table_update_count_ << " table updates."
224 0 : << " and " << token_receiver_->getReceivedTokenCount() << " received tokens.";
225 0 : pause_requested_.store(true);
226 0 : return true;
227 : }
228 :
229 0 : bool artdaq::RoutingManagerCore::resume(uint64_t /*unused*/, uint64_t /*unused*/)
230 : {
231 0 : TLOG(TLVL_DEBUG + 32) << "Resuming run " << run_id_.run();
232 0 : pause_requested_.store(false);
233 0 : metricMan->do_start();
234 0 : return true;
235 : }
236 :
237 0 : bool artdaq::RoutingManagerCore::shutdown(uint64_t /*unused*/)
238 : {
239 0 : shutdown_requested_.store(true);
240 0 : if (listen_thread_ && listen_thread_->joinable())
241 : {
242 0 : listen_thread_->join();
243 : }
244 0 : token_receiver_->stopTokenReception();
245 0 : policy_.reset();
246 0 : metricMan->shutdown();
247 0 : return true;
248 : }
249 :
250 0 : bool artdaq::RoutingManagerCore::soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
251 : {
252 0 : TLOG(TLVL_INFO) << "soft_initialize method called with "
253 0 : << "ParameterSet = \"" << pset.to_string()
254 0 : << "\".";
255 0 : return initialize(pset, timeout, timestamp);
256 : }
257 :
258 0 : bool artdaq::RoutingManagerCore::reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
259 : {
260 0 : TLOG(TLVL_INFO) << "reinitialize method called with "
261 0 : << "ParameterSet = \"" << pset.to_string()
262 0 : << "\".";
263 0 : return initialize(pset, timeout, timestamp);
264 : }
265 :
266 0 : void artdaq::RoutingManagerCore::process_event_table()
267 : {
268 0 : if (rt_priority_ > 0)
269 : {
270 : #pragma GCC diagnostic push
271 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
272 0 : sched_param s_param = {};
273 0 : s_param.sched_priority = rt_priority_;
274 0 : if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param) != 0)
275 : {
276 0 : TLOG(TLVL_WARNING) << "setting realtime priority failed";
277 : }
278 : #pragma GCC diagnostic pop
279 : }
280 :
281 : // try-catch block here?
282 :
283 : // how to turn RT PRI off?
284 0 : if (rt_priority_ > 0)
285 : {
286 : #pragma GCC diagnostic push
287 : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
288 0 : sched_param s_param = {};
289 0 : s_param.sched_priority = rt_priority_;
290 0 : int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
291 0 : if (status != 0)
292 : {
293 0 : TLOG(TLVL_ERROR)
294 0 : << "Failed to set realtime priority to " << rt_priority_
295 0 : << ", return code = " << status;
296 : }
297 : #pragma GCC diagnostic pop
298 : }
299 :
300 : // MPI_Barrier(local_group_comm_);
301 :
302 0 : TLOG(TLVL_DEBUG + 32) << "Sending initial table.";
303 0 : auto startTime = artdaq::MonitoredQuantity::getCurrentTime();
304 0 : auto nextSendTime = startTime;
305 : double delta_time;
306 0 : while (!stop_requested_ && !pause_requested_)
307 : {
308 0 : receive_();
309 0 : if (policy_->GetRoutingMode() == detail::RoutingManagerMode::EventBuilding || policy_->GetRoutingMode() == detail::RoutingManagerMode::RequestBasedEventBuilding)
310 : {
311 0 : startTime = artdaq::MonitoredQuantity::getCurrentTime();
312 :
313 0 : if (startTime >= nextSendTime)
314 : {
315 0 : auto table = policy_->GetCurrentTable();
316 :
317 0 : if (table.empty())
318 : {
319 0 : TLOG(TLVL_WARNING) << "Routing Policy generated Empty table for this routing interval (" << current_table_interval_ms_ << " ms)! This may indicate issues with the receivers, if it persists."
320 0 : << " Next seqID=" << policy_->GetNextSequenceID() << ", Policy held tokens=" << policy_->GetHeldTokenCount();
321 : }
322 : else
323 : {
324 0 : send_event_table(table);
325 0 : ++table_update_count_;
326 0 : delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
327 0 : statsHelperPtr_->addSample(TABLE_UPDATES_STAT_KEY, delta_time);
328 0 : TLOG(TLVL_DEBUG + 34) << "process_fragments TABLE_UPDATES_STAT_KEY=" << delta_time;
329 :
330 0 : bool readyToReport = statsHelperPtr_->readyToReport();
331 0 : if (readyToReport)
332 : {
333 0 : std::string statString = buildStatisticsString_();
334 0 : TLOG(TLVL_INFO) << statString;
335 0 : sendMetrics_();
336 0 : }
337 : }
338 :
339 0 : auto max_tokens = policy_->GetMaxNumberOfTokens();
340 0 : if (max_tokens > 0)
341 : {
342 0 : auto frac = policy_->GetTokensUsedSinceLastUpdate() / static_cast<double>(max_tokens);
343 0 : policy_->ResetTokensUsedSinceLastUpdate();
344 0 : if (frac > table_update_high_fraction_) current_table_interval_ms_ = 9 * current_table_interval_ms_ / 10;
345 0 : if (frac < table_update_low_fraction_) current_table_interval_ms_ = 11 * current_table_interval_ms_ / 10;
346 0 : if (current_table_interval_ms_ > max_table_update_interval_ms_) current_table_interval_ms_ = max_table_update_interval_ms_;
347 0 : if (current_table_interval_ms_ < 1) current_table_interval_ms_ = 1;
348 : }
349 0 : nextSendTime = startTime + current_table_interval_ms_ / 1000.0;
350 0 : TLOG(TLVL_DEBUG + 32) << "current_table_interval_ms is now " << current_table_interval_ms_;
351 0 : statsHelperPtr_->addSample(CURRENT_TABLE_INTERVAL_STAT_KEY, current_table_interval_ms_ / 1000.0);
352 0 : }
353 : else
354 : {
355 0 : usleep(current_table_interval_ms_ * 10); // 1/100 of the table update interval
356 : }
357 : }
358 : }
359 :
360 0 : TLOG(TLVL_DEBUG + 32) << "stop_requested_ is " << stop_requested_ << ", pause_requested_ is " << pause_requested_ << ", exiting process_event_table loop";
361 0 : policy_->Reset();
362 0 : metricMan->do_stop();
363 0 : }
364 :
365 0 : void artdaq::RoutingManagerCore::send_event_table(detail::RoutingPacket packet)
366 : {
367 0 : std::lock_guard<std::mutex> lk(fd_mutex_);
368 0 : for (auto& dest : connected_fds_)
369 : {
370 0 : for (auto& connected_fd : dest.second)
371 : {
372 0 : auto header = detail::RoutingPacketHeader(packet.size());
373 0 : TLOG(TLVL_DEBUG + 32) << "Sending table information for " << header.nEntries << " events to destination " << dest.first;
374 0 : TRACE(16, "headerData:0x%016lx%016lx packetData:0x%016lx%016lx", ((unsigned long*)&header)[0], ((unsigned long*)&header)[1], ((unsigned long*)&packet[0])[0], ((unsigned long*)&packet[0])[1]); // NOLINT
375 0 : auto sts = write(connected_fd, &header, sizeof(header));
376 0 : if (sts != sizeof(header))
377 : {
378 0 : TLOG(TLVL_ERROR) << "Error sending routing header to fd " << connected_fd << ", rank " << dest.first;
379 : }
380 : else
381 : {
382 0 : sts = write(connected_fd, &packet[0], packet.size() * sizeof(detail::RoutingPacketEntry));
383 0 : if (sts != static_cast<ssize_t>(packet.size() * sizeof(detail::RoutingPacketEntry)))
384 : {
385 0 : TLOG(TLVL_ERROR) << "Error sending routing table. sts=" << sts << "/" << packet.size() * sizeof(detail::RoutingPacketEntry) << ", fd=" << connected_fd << ", rank=" << dest.first;
386 : }
387 : }
388 : }
389 : }
390 0 : }
391 :
392 0 : std::string artdaq::RoutingManagerCore::report(std::string const& /*unused*/) const
393 : {
394 0 : std::string resultString;
395 :
396 : // if we haven't been able to come up with any report so far, say so
397 0 : auto tmpString = app_name + " run number = " + std::to_string(run_id_.run()) + ", table updates sent = " + std::to_string(table_update_count_) + ", Receiver tokens received = " + std::to_string(token_receiver_->getReceivedTokenCount());
398 0 : return tmpString;
399 0 : }
400 :
401 0 : std::string artdaq::RoutingManagerCore::buildStatisticsString_() const
402 : {
403 0 : std::ostringstream oss;
404 0 : oss << app_name << " statistics:" << std::endl;
405 :
406 0 : auto mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TABLE_UPDATES_STAT_KEY);
407 0 : if (mqPtr != nullptr)
408 : {
409 0 : artdaq::MonitoredQuantityStats stats;
410 0 : mqPtr->getStats(stats);
411 0 : oss << " Table Update statistics: "
412 0 : << stats.recentSampleCount << " table updates sent at "
413 0 : << stats.recentSampleRate << " table updates/sec, , monitor window = "
414 0 : << stats.recentDuration << " sec" << std::endl;
415 0 : oss << " Average times per table update: ";
416 0 : if (stats.recentSampleRate > 0.0)
417 : {
418 0 : oss << " elapsed time = "
419 0 : << (1.0 / stats.recentSampleRate) << " sec";
420 : }
421 0 : }
422 :
423 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TOKENS_RECEIVED_STAT_KEY);
424 0 : if (mqPtr != nullptr)
425 : {
426 0 : artdaq::MonitoredQuantityStats stats;
427 0 : mqPtr->getStats(stats);
428 0 : oss << " Received Token statistics: "
429 0 : << stats.recentSampleCount << " tokens received at "
430 0 : << stats.recentSampleRate << " tokens/sec, , monitor window = "
431 0 : << stats.recentDuration << " sec" << std::endl;
432 0 : oss << " Average times per token: ";
433 0 : if (stats.recentSampleRate > 0.0)
434 : {
435 0 : oss << " elapsed time = "
436 0 : << (1.0 / stats.recentSampleRate) << " sec";
437 : }
438 0 : oss << ", input token wait time = "
439 0 : << mqPtr->getRecentValueSum() << " sec" << std::endl;
440 0 : }
441 :
442 0 : return oss.str();
443 0 : }
444 :
445 0 : void artdaq::RoutingManagerCore::sendMetrics_()
446 : {
447 0 : if (metricMan)
448 : {
449 0 : auto mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TABLE_UPDATES_STAT_KEY);
450 0 : if (mqPtr != nullptr)
451 : {
452 0 : artdaq::MonitoredQuantityStats stats;
453 0 : mqPtr->getStats(stats);
454 0 : metricMan->sendMetric("Table Update Count", stats.fullSampleCount, "updates", 1, MetricMode::LastPoint);
455 0 : metricMan->sendMetric("Table Update Rate", stats.recentSampleRate, "updates/sec", 1, MetricMode::Average);
456 0 : }
457 :
458 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TOKENS_RECEIVED_STAT_KEY);
459 0 : if (mqPtr != nullptr)
460 : {
461 0 : artdaq::MonitoredQuantityStats stats;
462 0 : mqPtr->getStats(stats);
463 0 : metricMan->sendMetric("Receiver Token Count", stats.fullSampleCount, "updates", 1, MetricMode::LastPoint);
464 0 : metricMan->sendMetric("Receiver Token Rate", stats.recentSampleRate, "updates/sec", 1, MetricMode::Average);
465 0 : metricMan->sendMetric("Total Receiver Token Wait Time", mqPtr->getRecentValueSum(), "seconds", 3, MetricMode::Average);
466 0 : }
467 :
468 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(CURRENT_TABLE_INTERVAL_STAT_KEY);
469 0 : if (mqPtr.get() != nullptr)
470 : {
471 0 : artdaq::MonitoredQuantityStats stats;
472 0 : mqPtr->getStats(stats);
473 0 : metricMan->sendMetric("Table Update Interval", stats.recentValueAverage, "s", 3, MetricMode::Average);
474 0 : }
475 0 : }
476 0 : }
477 :
478 0 : void artdaq::RoutingManagerCore::listen_()
479 : {
480 0 : if (epoll_fd_ == -1)
481 : {
482 0 : epoll_fd_ = epoll_create1(0);
483 : }
484 0 : int listen_fd = -1;
485 0 : while (shutdown_requested_ == false)
486 : {
487 0 : TLOG(TLVL_DEBUG + 33) << "listen_: Listening/accepting new connections on port " << table_listen_port_;
488 0 : if (listen_fd == -1)
489 : {
490 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Opening listener";
491 0 : listen_fd = TCP_listen_fd(table_listen_port_, 0);
492 : }
493 0 : if (listen_fd == -1)
494 : {
495 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Error creating listen_fd!";
496 0 : break;
497 : }
498 :
499 : int res;
500 0 : timeval tv = {2, 0}; // maybe increase of some global "debugging" flag set???
501 : fd_set rfds;
502 0 : FD_ZERO(&rfds);
503 0 : FD_SET(listen_fd, &rfds); // NOLINT
504 :
505 0 : res = select(listen_fd + 1, &rfds, static_cast<fd_set*>(nullptr), static_cast<fd_set*>(nullptr), &tv);
506 0 : if (res > 0)
507 : {
508 : int sts;
509 : sockaddr_un un;
510 0 : socklen_t arglen = sizeof(un);
511 : int fd;
512 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Calling accept";
513 0 : fd = accept(listen_fd, reinterpret_cast<sockaddr*>(&un), &arglen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
514 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Done with accept";
515 :
516 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Reading connect message";
517 0 : socklen_t lenlen = sizeof(tv);
518 : /*sts=*/
519 0 : setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, lenlen); // see man 7 socket.
520 0 : detail::RoutingRequest rch;
521 0 : uint64_t mark_us = TimeUtils::gettimeofday_us();
522 0 : sts = read(fd, &rch, sizeof(rch));
523 0 : uint64_t delta_us = TimeUtils::gettimeofday_us() - mark_us;
524 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Read of connect message took " << delta_us << " microseconds.";
525 0 : if (sts != sizeof(rch))
526 : {
527 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Wrong message header length received!";
528 0 : close(fd);
529 0 : continue;
530 0 : }
531 :
532 : // check for "magic" and valid source_id(aka rank)
533 0 : if (rch.header != ROUTING_MAGIC || !(rch.mode == detail::RoutingRequest::RequestMode::Connect))
534 : {
535 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Wrong magic bytes in header! rch.header: " << std::hex << rch.header;
536 0 : close(fd);
537 0 : continue;
538 0 : }
539 :
540 : // now add (new) connection
541 0 : std::lock_guard<std::mutex> lk(fd_mutex_);
542 0 : connected_fds_[rch.rank].insert(fd);
543 : struct epoll_event ev;
544 0 : ev.data.fd = fd;
545 0 : ev.events = EPOLLIN;
546 0 : epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
547 0 : TLOG(TLVL_INFO) << "listen_: New fd is " << fd << " for table receiver rank " << rch.rank;
548 0 : }
549 : else
550 : {
551 0 : TLOG(TLVL_DEBUG + 34) << "listen_: No connections in timeout interval!";
552 : }
553 : }
554 :
555 0 : TLOG(TLVL_INFO) << "listen_: Shutting down connection listener";
556 0 : if (listen_fd != -1)
557 : {
558 0 : close(listen_fd);
559 : }
560 0 : std::lock_guard<std::mutex> lk(fd_mutex_);
561 0 : for (auto& fd_set : connected_fds_)
562 : {
563 0 : for (auto& fd : fd_set.second)
564 : {
565 0 : epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
566 0 : close(fd);
567 : }
568 : }
569 0 : connected_fds_.clear();
570 :
571 0 : } // listen_
572 :
573 0 : void artdaq::RoutingManagerCore::receive_()
574 : {
575 0 : if (epoll_fd_ == -1)
576 : {
577 0 : epoll_fd_ = epoll_create1(0);
578 : }
579 0 : std::vector<epoll_event> received_events(10);
580 :
581 0 : int nfds = 1;
582 0 : while (nfds > 0)
583 : {
584 0 : std::lock_guard<std::mutex> lk(fd_mutex_);
585 0 : nfds = epoll_wait(epoll_fd_, &received_events[0], received_events.size(), 1);
586 0 : if (nfds == -1)
587 : {
588 0 : TLOG(TLVL_ERROR) << "Error status received from epoll_wait, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
589 0 : perror("epoll_wait");
590 0 : exit(EXIT_FAILURE);
591 : }
592 :
593 0 : if (nfds > 0)
594 : {
595 0 : TLOG(TLVL_DEBUG + 35) << "Received " << nfds << " events on table sockets";
596 : }
597 0 : for (auto n = 0; n < nfds; ++n)
598 : {
599 0 : bool reading = true;
600 0 : int sts = 0;
601 0 : while (reading)
602 : {
603 0 : if ((received_events[n].events & EPOLLIN) != 0)
604 : {
605 0 : detail::RoutingRequest buff;
606 0 : auto stss = read(received_events[n].data.fd, &buff, sizeof(detail::RoutingRequest) - sts);
607 0 : sts += stss;
608 0 : if (stss == 0)
609 : {
610 0 : TLOG(TLVL_INFO) << "Received 0-size request from " << find_fd_(received_events[n].data.fd);
611 0 : reading = false;
612 : }
613 0 : else if (stss < 0 && errno == EAGAIN)
614 : {
615 0 : TLOG(TLVL_DEBUG + 32) << "No more requests from this rank. Continuing poll loop.";
616 0 : reading = false;
617 0 : }
618 0 : else if (stss < 0)
619 : {
620 0 : TLOG(TLVL_ERROR) << "Error reading from request socket: sts=" << sts << ", errno=" << errno << " (" << strerror(errno) << ")";
621 0 : close(received_events[n].data.fd);
622 0 : epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, received_events[n].data.fd, nullptr);
623 0 : reading = false;
624 : }
625 0 : else if (sts == sizeof(detail::RoutingRequest) && buff.header != ROUTING_MAGIC)
626 : {
627 0 : TLOG(TLVL_ERROR) << "Received invalid request from " << find_fd_(received_events[n].data.fd) << " sts=" << sts << ", header=" << std::hex << buff.header;
628 0 : reading = false;
629 0 : }
630 0 : else if (sts == sizeof(detail::RoutingRequest))
631 : {
632 0 : reading = false;
633 0 : sts = 0;
634 0 : TLOG(TLVL_DEBUG + 33) << "Received request from " << buff.rank << " mode=" << detail::RoutingRequest::RequestModeToString(buff.mode);
635 0 : detail::RoutingPacketEntry reply;
636 :
637 0 : switch (buff.mode)
638 : {
639 0 : case detail::RoutingRequest::RequestMode::Disconnect:
640 0 : connected_fds_[buff.rank].erase(received_events[n].data.fd);
641 0 : close(received_events[n].data.fd);
642 0 : epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, received_events[n].data.fd, nullptr);
643 0 : break;
644 :
645 0 : case detail::RoutingRequest::RequestMode::Request:
646 0 : reply = policy_->GetRouteForSequenceID(buff.sequence_id, buff.rank);
647 0 : if (reply.sequence_id == buff.sequence_id)
648 : {
649 0 : TLOG(TLVL_DEBUG + 33) << "Reply to request from " << buff.rank << " with route to " << reply.destination_rank << " for sequence ID " << buff.sequence_id;
650 0 : detail::RoutingPacketHeader hdr(1);
651 0 : write(received_events[n].data.fd, &hdr, sizeof(hdr));
652 0 : write(received_events[n].data.fd, &reply, sizeof(detail::RoutingPacketEntry));
653 : }
654 : else
655 : {
656 0 : TLOG(TLVL_DEBUG + 33) << "Unable to route request, replying with empty RoutingPacket";
657 0 : detail::RoutingPacketHeader hdr(0);
658 0 : write(received_events[n].data.fd, &hdr, sizeof(hdr));
659 : }
660 0 : break;
661 0 : default:
662 0 : TLOG(TLVL_WARNING) << "Received request from " << buff.rank << " with invalid mode " << detail::RoutingRequest::RequestModeToString(buff.mode) << " (currently only expecting Disconnect or Request)";
663 0 : break;
664 : }
665 : }
666 : }
667 : else
668 : {
669 0 : TLOG(TLVL_DEBUG + 32) << "Received event mask " << received_events[n].events << " from table socket rank " << find_fd_(received_events[n].data.fd);
670 : }
671 : }
672 : }
673 0 : }
674 0 : }
675 :
676 0 : int artdaq::RoutingManagerCore::find_fd_(int fd) const
677 : {
678 0 : for (auto& rank : connected_fds_)
679 : {
680 0 : if (rank.second.count(fd) != 0)
681 : {
682 0 : return rank.first;
683 : }
684 : }
685 0 : return -1;
686 : }
|