LCOV - code coverage report
Current view: top level - artdaq/Application - RoutingManagerCore.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 402 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 71 0

            Line data    Source code
       1              : #include "TRACE/tracemf.h"
       2              : #include "artdaq/DAQdata/Globals.hh"                           // include these 2 first to get tracemf.h -
       3              : #define TRACE_NAME (app_name + "_RoutingManagerCore").c_str()  // before trace.h
       4              : 
       5              : #include "artdaq/Application/RoutingManagerCore.hh"
       6              : 
       7              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       8              : #include "artdaq/DAQdata/TCP_listen_fd.hh"
       9              : #include "artdaq/RoutingPolicies/makeRoutingManagerPolicy.hh"
      10              : 
      11              : #include "fhiclcpp/ParameterSet.h"
      12              : 
      13              : #include <arpa/inet.h>
      14              : #include <netdb.h>
      15              : #include <pthread.h>
      16              : #include <sched.h>
      17              : #include <sys/time.h>
      18              : #include <sys/un.h>
      19              : #include <algorithm>
      20              : #include <memory>
      21              : 
      22              : const std::string artdaq::RoutingManagerCore::
      23              :     TABLE_UPDATES_STAT_KEY("RoutingManagerCoreTableUpdates");
      24              : const std::string artdaq::RoutingManagerCore::
      25              :     TOKENS_RECEIVED_STAT_KEY("RoutingManagerCoreTokensReceived");
      26              : const std::string artdaq::RoutingManagerCore::
      27              :     CURRENT_TABLE_INTERVAL_STAT_KEY("RoutingManagerCoreCurrentTableInterval");
      28              : 
      29            0 : artdaq::RoutingManagerCore::RoutingManagerCore()
      30            0 :     : shutdown_requested_(false)
      31            0 :     , stop_requested_(true)
      32            0 :     , pause_requested_(false)
      33            0 :     , statsHelperPtr_(new artdaq::StatisticsHelper())
      34              : {
      35            0 :         TLOG(TLVL_DEBUG + 32) << "Constructor";
      36            0 :         statsHelperPtr_->addMonitoredQuantityName(TABLE_UPDATES_STAT_KEY);
      37            0 :         statsHelperPtr_->addMonitoredQuantityName(TOKENS_RECEIVED_STAT_KEY);
      38            0 :         statsHelperPtr_->addMonitoredQuantityName(CURRENT_TABLE_INTERVAL_STAT_KEY);
      39            0 : }
      40              : 
      41            0 : artdaq::RoutingManagerCore::~RoutingManagerCore()
      42              : {
      43            0 :         TLOG(TLVL_DEBUG + 32) << "Destructor";
      44            0 :         artdaq::StatisticsCollection::getInstance().requestStop();
      45            0 :         token_receiver_->stopTokenReception(true);
      46            0 : }
      47              : 
      48            0 : bool artdaq::RoutingManagerCore::initialize(fhicl::ParameterSet const& pset, uint64_t /*unused*/, uint64_t /*unused*/)
      49              : {
      50            0 :         TLOG(TLVL_DEBUG + 32) << "initialize method called with "
      51            0 :                               << "ParameterSet = \"" << pset.to_string()
      52            0 :                               << "\".";
      53              : 
      54              :         // pull out the relevant parts of the ParameterSet
      55            0 :         fhicl::ParameterSet daq_pset;
      56              :         try
      57              :         {
      58            0 :                 daq_pset = pset.get<fhicl::ParameterSet>("daq");
      59              :         }
      60            0 :         catch (...)
      61              :         {
      62            0 :                 TLOG(TLVL_ERROR)
      63            0 :                     << "Unable to find the DAQ parameters in the initialization "
      64            0 :                     << "ParameterSet: \"" + pset.to_string() + "\".";
      65            0 :                 return false;
      66            0 :         }
      67              : 
      68            0 :         if (daq_pset.has_key("rank"))
      69              :         {
      70            0 :                 if (my_rank >= 0 && daq_pset.get<int>("rank") != my_rank)
      71              :                 {
      72            0 :                         TLOG(TLVL_WARNING) << "Routing Manager rank specified at startup is different than rank specified at configure! Using rank received at configure!";
      73              :                 }
      74            0 :                 my_rank = daq_pset.get<int>("rank");
      75              :         }
      76            0 :         if (my_rank == -1)
      77              :         {
      78            0 :                 TLOG(TLVL_ERROR) << "Routing Manager rank not specified at startup or in configuration! Aborting";
      79            0 :                 exit(1);
      80              :         }
      81              : 
      82              :         try
      83              :         {
      84            0 :                 policy_pset_ = daq_pset.get<fhicl::ParameterSet>("policy");
      85              :         }
      86            0 :         catch (...)
      87              :         {
      88            0 :                 TLOG(TLVL_ERROR)
      89            0 :                     << "Unable to find the policy parameters in the DAQ initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
      90            0 :                 return false;
      91            0 :         }
      92              : 
      93              :         try
      94              :         {
      95            0 :                 token_receiver_pset_ = daq_pset.get<fhicl::ParameterSet>("token_receiver");
      96              :         }
      97            0 :         catch (...)
      98              :         {
      99            0 :                 TLOG(TLVL_ERROR)
     100            0 :                     << "Unable to find the token_receiver parameters in the DAQ initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
     101            0 :                 return false;
     102            0 :         }
     103              : 
     104              :         // pull out the Metric part of the ParameterSet
     105            0 :         fhicl::ParameterSet metric_pset;
     106              :         try
     107              :         {
     108            0 :                 metric_pset = daq_pset.get<fhicl::ParameterSet>("metrics");
     109              :         }
     110            0 :         catch (...)
     111            0 :         {}  // OK if there's no metrics table defined in the FHiCL
     112              : 
     113            0 :         if (metric_pset.is_empty())
     114              :         {
     115            0 :                 TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
     116              :         }
     117              :         try
     118              :         {
     119            0 :                 metricMan->initialize(metric_pset, app_name);
     120              :         }
     121            0 :         catch (...)
     122              :         {
     123            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no,
     124              :                                  "Error loading metrics in RoutingManagerCore::initialize()");
     125            0 :         }
     126              : 
     127              :         // create the requested RoutingPolicy
     128            0 :         auto policy_plugin_spec = policy_pset_.get<std::string>("policy", "");
     129            0 :         if (policy_plugin_spec.length() == 0)
     130              :         {
     131            0 :                 TLOG(TLVL_ERROR)
     132            0 :                     << "No fragment generator (parameter name = \"policy\") was "
     133            0 :                     << "specified in the policy ParameterSet.  The "
     134            0 :                     << "DAQ initialization PSet was \"" << daq_pset.to_string() << "\".";
     135            0 :                 return false;
     136              :         }
     137              :         try
     138              :         {
     139            0 :                 policy_ = artdaq::makeRoutingManagerPolicy(policy_plugin_spec, policy_pset_);
     140              :         }
     141            0 :         catch (...)
     142              :         {
     143            0 :                 std::stringstream exception_string;
     144              :                 exception_string << "Exception thrown during initialization of policy of type \""
     145            0 :                                  << policy_plugin_spec << "\"";
     146              : 
     147            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
     148              : 
     149            0 :                 TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the policy which threw an exception: " << policy_pset_.to_string();
     150              : 
     151            0 :                 return false;
     152            0 :         }
     153              : 
     154            0 :         rt_priority_ = daq_pset.get<int>("rt_priority", 0);
     155            0 :         max_table_update_interval_ms_ = daq_pset.get<size_t>("table_update_interval_ms", 1000);
     156            0 :         current_table_interval_ms_ = max_table_update_interval_ms_;
     157            0 :         table_update_high_fraction_ = daq_pset.get<double>("table_update_interval_high_frac", 0.75);
     158            0 :         table_update_low_fraction_ = daq_pset.get<double>("table_update_interval_low_frac", 0.5);
     159              : 
     160              :         // fetch the monitoring parameters and create the MonitoredQuantity instances
     161            0 :         statsHelperPtr_->createCollectors(daq_pset, 100, 30.0, 60.0, TABLE_UPDATES_STAT_KEY);
     162              : 
     163              :         // create the requested TokenReceiver
     164            0 :         token_receiver_ = std::make_unique<TokenReceiver>(token_receiver_pset_, policy_, max_table_update_interval_ms_);
     165            0 :         token_receiver_->setStatsHelper(statsHelperPtr_, TOKENS_RECEIVED_STAT_KEY);
     166            0 :         token_receiver_->startTokenReception();
     167            0 :         token_receiver_->pauseTokenReception();
     168              : 
     169            0 :         table_listen_port_ = daq_pset.get<int>("table_update_port", 35556);
     170              : 
     171            0 :         shutdown_requested_.store(true);
     172            0 :         if (listen_thread_ && listen_thread_->joinable())
     173              :         {
     174            0 :                 listen_thread_->join();
     175              :         }
     176            0 :         shutdown_requested_.store(false);
     177            0 :         TLOG(TLVL_INFO) << "Starting Listener Thread";
     178              : 
     179              :         try
     180              :         {
     181            0 :                 listen_thread_ = std::make_unique<boost::thread>(&RoutingManagerCore::listen_, this);
     182              :         }
     183            0 :         catch (const boost::exception& e)
     184              :         {
     185            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     186            0 :                 std::cerr << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     187            0 :                 exit(5);
     188            0 :         }
     189            0 :         return true;
     190            0 : }
     191              : 
     192            0 : bool artdaq::RoutingManagerCore::start(art::RunID id, uint64_t /*unused*/, uint64_t /*unused*/)
     193              : {
     194            0 :         run_id_ = id;
     195            0 :         stop_requested_.store(false);
     196            0 :         pause_requested_.store(false);
     197              : 
     198            0 :         statsHelperPtr_->resetStatistics();
     199              : 
     200            0 :         metricMan->do_start();
     201            0 :         table_update_count_ = 0;
     202            0 :         token_receiver_->setRunNumber(run_id_.run());
     203            0 :         token_receiver_->resumeTokenReception();
     204              : 
     205            0 :         TLOG(TLVL_INFO) << "Started run " << run_id_.run();
     206            0 :         return true;
     207              : }
     208              : 
     209            0 : bool artdaq::RoutingManagerCore::stop(uint64_t /*unused*/, uint64_t /*unused*/)
     210              : {
     211            0 :         TLOG(TLVL_INFO) << "Stopping run " << run_id_.run()
     212            0 :                         << " after " << table_update_count_ << " table updates."
     213            0 :                         << " and " << token_receiver_->getReceivedTokenCount() << " received tokens.";
     214            0 :         stop_requested_.store(true);
     215            0 :         token_receiver_->pauseTokenReception();
     216            0 :         run_id_ = art::RunID::flushRun();
     217            0 :         return true;
     218              : }
     219              : 
     220            0 : bool artdaq::RoutingManagerCore::pause(uint64_t /*unused*/, uint64_t /*unused*/)
     221              : {
     222            0 :         TLOG(TLVL_INFO) << "Pausing run " << run_id_.run()
     223            0 :                         << " after " << table_update_count_ << " table updates."
     224            0 :                         << " and " << token_receiver_->getReceivedTokenCount() << " received tokens.";
     225            0 :         pause_requested_.store(true);
     226            0 :         return true;
     227              : }
     228              : 
     229            0 : bool artdaq::RoutingManagerCore::resume(uint64_t /*unused*/, uint64_t /*unused*/)
     230              : {
     231            0 :         TLOG(TLVL_DEBUG + 32) << "Resuming run " << run_id_.run();
     232            0 :         pause_requested_.store(false);
     233            0 :         metricMan->do_start();
     234            0 :         return true;
     235              : }
     236              : 
     237            0 : bool artdaq::RoutingManagerCore::shutdown(uint64_t /*unused*/)
     238              : {
     239            0 :         shutdown_requested_.store(true);
     240            0 :         if (listen_thread_ && listen_thread_->joinable())
     241              :         {
     242            0 :                 listen_thread_->join();
     243              :         }
     244            0 :         token_receiver_->stopTokenReception();
     245            0 :         policy_.reset();
     246            0 :         metricMan->shutdown();
     247            0 :         return true;
     248              : }
     249              : 
     250            0 : bool artdaq::RoutingManagerCore::soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
     251              : {
     252            0 :         TLOG(TLVL_INFO) << "soft_initialize method called with "
     253            0 :                         << "ParameterSet = \"" << pset.to_string()
     254            0 :                         << "\".";
     255            0 :         return initialize(pset, timeout, timestamp);
     256              : }
     257              : 
     258            0 : bool artdaq::RoutingManagerCore::reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
     259              : {
     260            0 :         TLOG(TLVL_INFO) << "reinitialize method called with "
     261            0 :                         << "ParameterSet = \"" << pset.to_string()
     262            0 :                         << "\".";
     263            0 :         return initialize(pset, timeout, timestamp);
     264              : }
     265              : 
     266            0 : void artdaq::RoutingManagerCore::process_event_table()
     267              : {
     268            0 :         if (rt_priority_ > 0)
     269              :         {
     270              : #pragma GCC diagnostic push
     271              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     272            0 :                 sched_param s_param = {};
     273            0 :                 s_param.sched_priority = rt_priority_;
     274            0 :                 if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param) != 0)
     275              :                 {
     276            0 :                         TLOG(TLVL_WARNING) << "setting realtime priority failed";
     277              :                 }
     278              : #pragma GCC diagnostic pop
     279              :         }
     280              : 
     281              :         // try-catch block here?
     282              : 
     283              :         // how to turn RT PRI off?
     284            0 :         if (rt_priority_ > 0)
     285              :         {
     286              : #pragma GCC diagnostic push
     287              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     288            0 :                 sched_param s_param = {};
     289            0 :                 s_param.sched_priority = rt_priority_;
     290            0 :                 int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
     291            0 :                 if (status != 0)
     292              :                 {
     293            0 :                         TLOG(TLVL_ERROR)
     294            0 :                             << "Failed to set realtime priority to " << rt_priority_
     295            0 :                             << ", return code = " << status;
     296              :                 }
     297              : #pragma GCC diagnostic pop
     298              :         }
     299              : 
     300              :         // MPI_Barrier(local_group_comm_);
     301              : 
     302            0 :         TLOG(TLVL_DEBUG + 32) << "Sending initial table.";
     303            0 :         auto startTime = artdaq::MonitoredQuantity::getCurrentTime();
     304            0 :         auto nextSendTime = startTime;
     305              :         double delta_time;
     306            0 :         while (!stop_requested_ && !pause_requested_)
     307              :         {
     308            0 :                 receive_();
     309            0 :                 if (policy_->GetRoutingMode() == detail::RoutingManagerMode::EventBuilding || policy_->GetRoutingMode() == detail::RoutingManagerMode::RequestBasedEventBuilding)
     310              :                 {
     311            0 :                         startTime = artdaq::MonitoredQuantity::getCurrentTime();
     312              : 
     313            0 :                         if (startTime >= nextSendTime)
     314              :                         {
     315            0 :                                 auto table = policy_->GetCurrentTable();
     316              : 
     317            0 :                                 if (table.empty())
     318              :                                 {
     319            0 :                                         TLOG(TLVL_WARNING) << "Routing Policy generated Empty table for this routing interval (" << current_table_interval_ms_ << " ms)! This may indicate issues with the receivers, if it persists."
     320            0 :                                                            << " Next seqID=" << policy_->GetNextSequenceID() << ", Policy held tokens=" << policy_->GetHeldTokenCount();
     321              :                                 }
     322              :                                 else
     323              :                                 {
     324            0 :                                         send_event_table(table);
     325            0 :                                         ++table_update_count_;
     326            0 :                                         delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
     327            0 :                                         statsHelperPtr_->addSample(TABLE_UPDATES_STAT_KEY, delta_time);
     328            0 :                                         TLOG(TLVL_DEBUG + 34) << "process_fragments TABLE_UPDATES_STAT_KEY=" << delta_time;
     329              : 
     330            0 :                                         bool readyToReport = statsHelperPtr_->readyToReport();
     331            0 :                                         if (readyToReport)
     332              :                                         {
     333            0 :                                                 std::string statString = buildStatisticsString_();
     334            0 :                                                 TLOG(TLVL_INFO) << statString;
     335            0 :                                                 sendMetrics_();
     336            0 :                                         }
     337              :                                 }
     338              : 
     339            0 :                                 auto max_tokens = policy_->GetMaxNumberOfTokens();
     340            0 :                                 if (max_tokens > 0)
     341              :                                 {
     342            0 :                                         auto frac = policy_->GetTokensUsedSinceLastUpdate() / static_cast<double>(max_tokens);
     343            0 :                                         policy_->ResetTokensUsedSinceLastUpdate();
     344            0 :                                         if (frac > table_update_high_fraction_) current_table_interval_ms_ = 9 * current_table_interval_ms_ / 10;
     345            0 :                                         if (frac < table_update_low_fraction_) current_table_interval_ms_ = 11 * current_table_interval_ms_ / 10;
     346            0 :                                         if (current_table_interval_ms_ > max_table_update_interval_ms_) current_table_interval_ms_ = max_table_update_interval_ms_;
     347            0 :                                         if (current_table_interval_ms_ < 1) current_table_interval_ms_ = 1;
     348              :                                 }
     349            0 :                                 nextSendTime = startTime + current_table_interval_ms_ / 1000.0;
     350            0 :                                 TLOG(TLVL_DEBUG + 32) << "current_table_interval_ms is now " << current_table_interval_ms_;
     351            0 :                                 statsHelperPtr_->addSample(CURRENT_TABLE_INTERVAL_STAT_KEY, current_table_interval_ms_ / 1000.0);
     352            0 :                         }
     353              :                         else
     354              :                         {
     355            0 :                                 usleep(current_table_interval_ms_ * 10);  // 1/100 of the table update interval
     356              :                         }
     357              :                 }
     358              :         }
     359              : 
     360            0 :         TLOG(TLVL_DEBUG + 32) << "stop_requested_ is " << stop_requested_ << ", pause_requested_ is " << pause_requested_ << ", exiting process_event_table loop";
     361            0 :         policy_->Reset();
     362            0 :         metricMan->do_stop();
     363            0 : }
     364              : 
     365            0 : void artdaq::RoutingManagerCore::send_event_table(detail::RoutingPacket packet)
     366              : {
     367            0 :         std::lock_guard<std::mutex> lk(fd_mutex_);
     368            0 :         for (auto& dest : connected_fds_)
     369              :         {
     370            0 :                 for (auto& connected_fd : dest.second)
     371              :                 {
     372            0 :                         auto header = detail::RoutingPacketHeader(packet.size());
     373            0 :                         TLOG(TLVL_DEBUG + 32) << "Sending table information for " << header.nEntries << " events to destination " << dest.first;
     374            0 :                         TRACE(16, "headerData:0x%016lx%016lx packetData:0x%016lx%016lx", ((unsigned long*)&header)[0], ((unsigned long*)&header)[1], ((unsigned long*)&packet[0])[0], ((unsigned long*)&packet[0])[1]);  // NOLINT
     375            0 :                         auto sts = write(connected_fd, &header, sizeof(header));
     376            0 :                         if (sts != sizeof(header))
     377              :                         {
     378            0 :                                 TLOG(TLVL_ERROR) << "Error sending routing header to fd " << connected_fd << ", rank " << dest.first;
     379              :                         }
     380              :                         else
     381              :                         {
     382            0 :                                 sts = write(connected_fd, &packet[0], packet.size() * sizeof(detail::RoutingPacketEntry));
     383            0 :                                 if (sts != static_cast<ssize_t>(packet.size() * sizeof(detail::RoutingPacketEntry)))
     384              :                                 {
     385            0 :                                         TLOG(TLVL_ERROR) << "Error sending routing table. sts=" << sts << "/" << packet.size() * sizeof(detail::RoutingPacketEntry) << ", fd=" << connected_fd << ", rank=" << dest.first;
     386              :                                 }
     387              :                         }
     388              :                 }
     389              :         }
     390            0 : }
     391              : 
     392            0 : std::string artdaq::RoutingManagerCore::report(std::string const& /*unused*/) const
     393              : {
     394            0 :         std::string resultString;
     395              : 
     396              :         // if we haven't been able to come up with any report so far, say so
     397            0 :         auto tmpString = app_name + " run number = " + std::to_string(run_id_.run()) + ", table updates sent = " + std::to_string(table_update_count_) + ", Receiver tokens received = " + std::to_string(token_receiver_->getReceivedTokenCount());
     398            0 :         return tmpString;
     399            0 : }
     400              : 
     401            0 : std::string artdaq::RoutingManagerCore::buildStatisticsString_() const
     402              : {
     403            0 :         std::ostringstream oss;
     404            0 :         oss << app_name << " statistics:" << std::endl;
     405              : 
     406            0 :         auto mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TABLE_UPDATES_STAT_KEY);
     407            0 :         if (mqPtr != nullptr)
     408              :         {
     409            0 :                 artdaq::MonitoredQuantityStats stats;
     410            0 :                 mqPtr->getStats(stats);
     411            0 :                 oss << "  Table Update statistics: "
     412            0 :                     << stats.recentSampleCount << " table updates sent at "
     413            0 :                     << stats.recentSampleRate << " table updates/sec, , monitor window = "
     414            0 :                     << stats.recentDuration << " sec" << std::endl;
     415            0 :                 oss << "  Average times per table update: ";
     416            0 :                 if (stats.recentSampleRate > 0.0)
     417              :                 {
     418            0 :                         oss << " elapsed time = "
     419            0 :                             << (1.0 / stats.recentSampleRate) << " sec";
     420              :                 }
     421            0 :         }
     422              : 
     423            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TOKENS_RECEIVED_STAT_KEY);
     424            0 :         if (mqPtr != nullptr)
     425              :         {
     426            0 :                 artdaq::MonitoredQuantityStats stats;
     427            0 :                 mqPtr->getStats(stats);
     428            0 :                 oss << "  Received Token statistics: "
     429            0 :                     << stats.recentSampleCount << " tokens received at "
     430            0 :                     << stats.recentSampleRate << " tokens/sec, , monitor window = "
     431            0 :                     << stats.recentDuration << " sec" << std::endl;
     432            0 :                 oss << "  Average times per token: ";
     433            0 :                 if (stats.recentSampleRate > 0.0)
     434              :                 {
     435            0 :                         oss << " elapsed time = "
     436            0 :                             << (1.0 / stats.recentSampleRate) << " sec";
     437              :                 }
     438            0 :                 oss << ", input token wait time = "
     439            0 :                     << mqPtr->getRecentValueSum() << " sec" << std::endl;
     440            0 :         }
     441              : 
     442            0 :         return oss.str();
     443            0 : }
     444              : 
     445            0 : void artdaq::RoutingManagerCore::sendMetrics_()
     446              : {
     447            0 :         if (metricMan)
     448              :         {
     449            0 :                 auto mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TABLE_UPDATES_STAT_KEY);
     450            0 :                 if (mqPtr != nullptr)
     451              :                 {
     452            0 :                         artdaq::MonitoredQuantityStats stats;
     453            0 :                         mqPtr->getStats(stats);
     454            0 :                         metricMan->sendMetric("Table Update Count", stats.fullSampleCount, "updates", 1, MetricMode::LastPoint);
     455            0 :                         metricMan->sendMetric("Table Update Rate", stats.recentSampleRate, "updates/sec", 1, MetricMode::Average);
     456            0 :                 }
     457              : 
     458            0 :                 mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(TOKENS_RECEIVED_STAT_KEY);
     459            0 :                 if (mqPtr != nullptr)
     460              :                 {
     461            0 :                         artdaq::MonitoredQuantityStats stats;
     462            0 :                         mqPtr->getStats(stats);
     463            0 :                         metricMan->sendMetric("Receiver Token Count", stats.fullSampleCount, "updates", 1, MetricMode::LastPoint);
     464            0 :                         metricMan->sendMetric("Receiver Token Rate", stats.recentSampleRate, "updates/sec", 1, MetricMode::Average);
     465            0 :                         metricMan->sendMetric("Total Receiver Token Wait Time", mqPtr->getRecentValueSum(), "seconds", 3, MetricMode::Average);
     466            0 :                 }
     467              : 
     468            0 :                 mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(CURRENT_TABLE_INTERVAL_STAT_KEY);
     469            0 :                 if (mqPtr.get() != nullptr)
     470              :                 {
     471            0 :                         artdaq::MonitoredQuantityStats stats;
     472            0 :                         mqPtr->getStats(stats);
     473            0 :                         metricMan->sendMetric("Table Update Interval", stats.recentValueAverage, "s", 3, MetricMode::Average);
     474            0 :                 }
     475            0 :         }
     476            0 : }
     477              : 
     478            0 : void artdaq::RoutingManagerCore::listen_()
     479              : {
     480            0 :         if (epoll_fd_ == -1)
     481              :         {
     482            0 :                 epoll_fd_ = epoll_create1(0);
     483              :         }
     484            0 :         int listen_fd = -1;
     485            0 :         while (shutdown_requested_ == false)
     486              :         {
     487            0 :                 TLOG(TLVL_DEBUG + 33) << "listen_: Listening/accepting new connections on port " << table_listen_port_;
     488            0 :                 if (listen_fd == -1)
     489              :                 {
     490            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Opening listener";
     491            0 :                         listen_fd = TCP_listen_fd(table_listen_port_, 0);
     492              :                 }
     493            0 :                 if (listen_fd == -1)
     494              :                 {
     495            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Error creating listen_fd!";
     496            0 :                         break;
     497              :                 }
     498              : 
     499              :                 int res;
     500            0 :                 timeval tv = {2, 0};  // maybe increase of some global "debugging" flag set???
     501              :                 fd_set rfds;
     502            0 :                 FD_ZERO(&rfds);
     503            0 :                 FD_SET(listen_fd, &rfds);  // NOLINT
     504              : 
     505            0 :                 res = select(listen_fd + 1, &rfds, static_cast<fd_set*>(nullptr), static_cast<fd_set*>(nullptr), &tv);
     506            0 :                 if (res > 0)
     507              :                 {
     508              :                         int sts;
     509              :                         sockaddr_un un;
     510            0 :                         socklen_t arglen = sizeof(un);
     511              :                         int fd;
     512            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Calling accept";
     513            0 :                         fd = accept(listen_fd, reinterpret_cast<sockaddr*>(&un), &arglen);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     514            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Done with accept";
     515              : 
     516            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Reading connect message";
     517            0 :                         socklen_t lenlen = sizeof(tv);
     518              :                         /*sts=*/
     519            0 :                         setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, lenlen);  // see man 7 socket.
     520            0 :                         detail::RoutingRequest rch;
     521            0 :                         uint64_t mark_us = TimeUtils::gettimeofday_us();
     522            0 :                         sts = read(fd, &rch, sizeof(rch));
     523            0 :                         uint64_t delta_us = TimeUtils::gettimeofday_us() - mark_us;
     524            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Read of connect message took " << delta_us << " microseconds.";
     525            0 :                         if (sts != sizeof(rch))
     526              :                         {
     527            0 :                                 TLOG(TLVL_DEBUG + 32) << "listen_: Wrong message header length received!";
     528            0 :                                 close(fd);
     529            0 :                                 continue;
     530            0 :                         }
     531              : 
     532              :                         // check for "magic" and valid source_id(aka rank)
     533            0 :                         if (rch.header != ROUTING_MAGIC || !(rch.mode == detail::RoutingRequest::RequestMode::Connect))
     534              :                         {
     535            0 :                                 TLOG(TLVL_DEBUG + 32) << "listen_: Wrong magic bytes in header! rch.header: " << std::hex << rch.header;
     536            0 :                                 close(fd);
     537            0 :                                 continue;
     538            0 :                         }
     539              : 
     540              :                         // now add (new) connection
     541            0 :                         std::lock_guard<std::mutex> lk(fd_mutex_);
     542            0 :                         connected_fds_[rch.rank].insert(fd);
     543              :                         struct epoll_event ev;
     544            0 :                         ev.data.fd = fd;
     545            0 :                         ev.events = EPOLLIN;
     546            0 :                         epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
     547            0 :                         TLOG(TLVL_INFO) << "listen_: New fd is " << fd << " for table receiver rank " << rch.rank;
     548            0 :                 }
     549              :                 else
     550              :                 {
     551            0 :                         TLOG(TLVL_DEBUG + 34) << "listen_: No connections in timeout interval!";
     552              :                 }
     553              :         }
     554              : 
     555            0 :         TLOG(TLVL_INFO) << "listen_: Shutting down connection listener";
     556            0 :         if (listen_fd != -1)
     557              :         {
     558            0 :                 close(listen_fd);
     559              :         }
     560            0 :         std::lock_guard<std::mutex> lk(fd_mutex_);
     561            0 :         for (auto& fd_set : connected_fds_)
     562              :         {
     563            0 :                 for (auto& fd : fd_set.second)
     564              :                 {
     565            0 :                         epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     566            0 :                         close(fd);
     567              :                 }
     568              :         }
     569            0 :         connected_fds_.clear();
     570              : 
     571            0 : }  // listen_
     572              : 
     573            0 : void artdaq::RoutingManagerCore::receive_()
     574              : {
     575            0 :         if (epoll_fd_ == -1)
     576              :         {
     577            0 :                 epoll_fd_ = epoll_create1(0);
     578              :         }
     579            0 :         std::vector<epoll_event> received_events(10);
     580              : 
     581            0 :         int nfds = 1;
     582            0 :         while (nfds > 0)
     583              :         {
     584            0 :                 std::lock_guard<std::mutex> lk(fd_mutex_);
     585            0 :                 nfds = epoll_wait(epoll_fd_, &received_events[0], received_events.size(), 1);
     586            0 :                 if (nfds == -1)
     587              :                 {
     588            0 :                         TLOG(TLVL_ERROR) << "Error status received from epoll_wait, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
     589            0 :                         perror("epoll_wait");
     590            0 :                         exit(EXIT_FAILURE);
     591              :                 }
     592              : 
     593            0 :                 if (nfds > 0)
     594              :                 {
     595            0 :                         TLOG(TLVL_DEBUG + 35) << "Received " << nfds << " events on table sockets";
     596              :                 }
     597            0 :                 for (auto n = 0; n < nfds; ++n)
     598              :                 {
     599            0 :                         bool reading = true;
     600            0 :                         int sts = 0;
     601            0 :                         while (reading)
     602              :                         {
     603            0 :                                 if ((received_events[n].events & EPOLLIN) != 0)
     604              :                                 {
     605            0 :                                         detail::RoutingRequest buff;
     606            0 :                                         auto stss = read(received_events[n].data.fd, &buff, sizeof(detail::RoutingRequest) - sts);
     607            0 :                                         sts += stss;
     608            0 :                                         if (stss == 0)
     609              :                                         {
     610            0 :                                                 TLOG(TLVL_INFO) << "Received 0-size request from " << find_fd_(received_events[n].data.fd);
     611            0 :                                                 reading = false;
     612              :                                         }
     613            0 :                                         else if (stss < 0 && errno == EAGAIN)
     614              :                                         {
     615            0 :                                                 TLOG(TLVL_DEBUG + 32) << "No more requests from this rank. Continuing poll loop.";
     616            0 :                                                 reading = false;
     617            0 :                                         }
     618            0 :                                         else if (stss < 0)
     619              :                                         {
     620            0 :                                                 TLOG(TLVL_ERROR) << "Error reading from request socket: sts=" << sts << ", errno=" << errno << " (" << strerror(errno) << ")";
     621            0 :                                                 close(received_events[n].data.fd);
     622            0 :                                                 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, received_events[n].data.fd, nullptr);
     623            0 :                                                 reading = false;
     624              :                                         }
     625            0 :                                         else if (sts == sizeof(detail::RoutingRequest) && buff.header != ROUTING_MAGIC)
     626              :                                         {
     627            0 :                                                 TLOG(TLVL_ERROR) << "Received invalid request from " << find_fd_(received_events[n].data.fd) << " sts=" << sts << ", header=" << std::hex << buff.header;
     628            0 :                                                 reading = false;
     629            0 :                                         }
     630            0 :                                         else if (sts == sizeof(detail::RoutingRequest))
     631              :                                         {
     632            0 :                                                 reading = false;
     633            0 :                                                 sts = 0;
     634            0 :                                                 TLOG(TLVL_DEBUG + 33) << "Received request from " << buff.rank << " mode=" << detail::RoutingRequest::RequestModeToString(buff.mode);
     635            0 :                                                 detail::RoutingPacketEntry reply;
     636              : 
     637            0 :                                                 switch (buff.mode)
     638              :                                                 {
     639            0 :                                                         case detail::RoutingRequest::RequestMode::Disconnect:
     640            0 :                                                                 connected_fds_[buff.rank].erase(received_events[n].data.fd);
     641            0 :                                                                 close(received_events[n].data.fd);
     642            0 :                                                                 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, received_events[n].data.fd, nullptr);
     643            0 :                                                                 break;
     644              : 
     645            0 :                                                         case detail::RoutingRequest::RequestMode::Request:
     646            0 :                                                                 reply = policy_->GetRouteForSequenceID(buff.sequence_id, buff.rank);
     647            0 :                                                                 if (reply.sequence_id == buff.sequence_id)
     648              :                                                                 {
     649            0 :                                                                         TLOG(TLVL_DEBUG + 33) << "Reply to request from " << buff.rank << " with route to " << reply.destination_rank << " for sequence ID " << buff.sequence_id;
     650            0 :                                                                         detail::RoutingPacketHeader hdr(1);
     651            0 :                                                                         write(received_events[n].data.fd, &hdr, sizeof(hdr));
     652            0 :                                                                         write(received_events[n].data.fd, &reply, sizeof(detail::RoutingPacketEntry));
     653              :                                                                 }
     654              :                                                                 else
     655              :                                                                 {
     656            0 :                                                                         TLOG(TLVL_DEBUG + 33) << "Unable to route request, replying with empty RoutingPacket";
     657            0 :                                                                         detail::RoutingPacketHeader hdr(0);
     658            0 :                                                                         write(received_events[n].data.fd, &hdr, sizeof(hdr));
     659              :                                                                 }
     660            0 :                                                                 break;
     661            0 :                                                         default:
     662            0 :                                                                 TLOG(TLVL_WARNING) << "Received request from " << buff.rank << " with invalid mode " << detail::RoutingRequest::RequestModeToString(buff.mode) << " (currently only expecting Disconnect or Request)";
     663            0 :                                                                 break;
     664              :                                                 }
     665              :                                         }
     666              :                                 }
     667              :                                 else
     668              :                                 {
     669            0 :                                         TLOG(TLVL_DEBUG + 32) << "Received event mask " << received_events[n].events << " from table socket rank " << find_fd_(received_events[n].data.fd);
     670              :                                 }
     671              :                         }
     672              :                 }
     673            0 :         }
     674            0 : }
     675              : 
     676            0 : int artdaq::RoutingManagerCore::find_fd_(int fd) const
     677              : {
     678            0 :         for (auto& rank : connected_fds_)
     679              :         {
     680            0 :                 if (rank.second.count(fd) != 0)
     681              :                 {
     682            0 :                         return rank.first;
     683              :                 }
     684              :         }
     685            0 :         return -1;
     686              : }
        

Generated by: LCOV version 2.0-1