LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - TokenReceiver.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 127 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 28 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_TokenReceiver").c_str()
       3              : 
       4              : #include <arpa/inet.h>
       5              : 
       6              : #include <utility>
       7              : 
       8              : #include <utility>
       9              : #include "artdaq/DAQdata/TCP_listen_fd.hh"
      10              : #include "artdaq/DAQrate/detail/TokenReceiver.hh"
      11              : 
      12            0 : artdaq::TokenReceiver::TokenReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RoutingManagerPolicy> policy,
      13            0 :                                      size_t update_interval_msec)
      14            0 :     : token_port_(ps.get<int>("routing_token_port", 35555))
      15            0 :     , policy_(std::move(std::move(policy)))
      16            0 :     , update_interval_msec_(update_interval_msec)
      17            0 :     , token_socket_(-1)
      18            0 :     , token_epoll_fd_(-1)
      19            0 :     , thread_is_running_(false)
      20            0 :     , reception_is_paused_(false)
      21            0 :     , shutdown_requested_(false)
      22            0 :     , run_number_(0)
      23            0 :     , statsHelperPtr_(nullptr)
      24              : {
      25            0 :         receive_token_events_ = std::vector<epoll_event>(policy_->GetReceiverCount() + 1);
      26            0 : }
      27              : 
      28            0 : artdaq::TokenReceiver::~TokenReceiver()
      29              : {
      30            0 :         stopTokenReception(true);
      31            0 : }
      32              : 
      33            0 : void artdaq::TokenReceiver::startTokenReception()
      34              : {
      35            0 :         if (token_thread_.joinable())
      36              :         {
      37            0 :                 token_thread_.join();
      38              :         }
      39            0 :         boost::thread::attributes attrs;
      40            0 :         attrs.set_stack_size(4096 * 2000);  // 8000 KB
      41              : 
      42            0 :         reception_is_paused_ = false;
      43            0 :         shutdown_requested_ = false;
      44              : 
      45            0 :         TLOG(TLVL_INFO) << "Starting Token Reception Thread";
      46              :         try
      47              :         {
      48            0 :                 token_thread_ = boost::thread(attrs, boost::bind(&TokenReceiver::receiveTokensLoop_, this));
      49              :                 char tname[16];                                               // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
      50            0 :                 snprintf(tname, sizeof(tname) - 1, "%d-TokenRecv", my_rank);  // NOLINT
      51            0 :                 tname[sizeof(tname) - 1] = '\0';                              // assure term. snprintf is not too evil :)
      52            0 :                 auto handle = token_thread_.native_handle();
      53            0 :                 pthread_setname_np(handle, tname);
      54              :         }
      55            0 :         catch (boost::exception const& e)
      56              :         {
      57            0 :                 TLOG(TLVL_ERROR) << "Exception encountered starting Token Reception thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
      58            0 :                 std::cerr << "Exception encountered starting Token Reception thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
      59            0 :                 exit(3);
      60            0 :         }
      61            0 :         received_token_count_ = 0;
      62            0 :         thread_is_running_ = true;
      63            0 :         TLOG(TLVL_INFO) << "Started Token Reception Thread";
      64            0 : }
      65              : 
      66            0 : void artdaq::TokenReceiver::stopTokenReception(bool force)
      67              : {
      68            0 :         shutdown_requested_ = true;
      69            0 :         reception_is_paused_ = false;
      70            0 :         if (thread_is_running_)
      71              :         {
      72            0 :                 if (received_token_count_ == 0 && !force)
      73              :                 {
      74            0 :                         TLOG(TLVL_DEBUG + 32) << "Stop request received by TokenReceiver, but no tokens have ever been received.";
      75              :                 }
      76            0 :                 TLOG(TLVL_DEBUG + 32) << "Joining tokenThread";
      77              :                 try
      78              :                 {
      79            0 :                         if (token_thread_.joinable())
      80              :                         {
      81            0 :                                 token_thread_.join();
      82              :                         }
      83              :                 }
      84            0 :                 catch (...)
      85              :                 {
      86              :                         // IGNORED
      87            0 :                 }
      88            0 :                 thread_is_running_ = false;
      89              :         }
      90              : 
      91            0 :         if (token_socket_ != -1)
      92              :         {
      93            0 :                 close(token_socket_);
      94            0 :                 token_socket_ = -1;
      95            0 :                 token_epoll_fd_ = -1;
      96              :         }
      97            0 : }
      98              : 
      99            0 : void artdaq::TokenReceiver::receiveTokensLoop_()
     100              : {
     101            0 :         while (!shutdown_requested_)
     102              :         {
     103            0 :                 TLOG(TLVL_DEBUG + 33) << "Receive Token loop start";
     104            0 :                 if (token_socket_ == -1)
     105              :                 {
     106            0 :                         TLOG(TLVL_DEBUG + 32) << "Opening token listener socket";
     107            0 :                         token_socket_ = TCP_listen_fd(token_port_, 3 * sizeof(detail::RoutingToken));
     108              : 
     109            0 :                         if (token_epoll_fd_ != -1)
     110              :                         {
     111            0 :                                 close(token_epoll_fd_);
     112              :                         }
     113              :                         struct epoll_event ev;
     114            0 :                         token_epoll_fd_ = epoll_create1(0);
     115            0 :                         ev.events = EPOLLIN;
     116            0 :                         ev.data.fd = token_socket_;
     117            0 :                         if (epoll_ctl(token_epoll_fd_, EPOLL_CTL_ADD, token_socket_, &ev) == -1)
     118              :                         {
     119            0 :                                 TLOG(TLVL_ERROR) << "Could not register listen socket to epoll fd";
     120            0 :                                 exit(3);
     121              :                         }
     122              :                 }
     123            0 :                 if (token_socket_ == -1 || token_epoll_fd_ == -1)
     124              :                 {
     125            0 :                         TLOG(TLVL_DEBUG + 32) << "One of the listen sockets was not opened successfully.";
     126            0 :                         return;
     127              :                 }
     128              : 
     129            0 :                 auto nfds = epoll_wait(token_epoll_fd_, &receive_token_events_[0], receive_token_events_.size(), update_interval_msec_);
     130            0 :                 if (nfds == -1)
     131              :                 {
     132            0 :                         TLOG(TLVL_ERROR) << "Error status received from epoll_wait, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
     133            0 :                         perror("epoll_wait");
     134            0 :                         exit(EXIT_FAILURE);
     135              :                 }
     136              : 
     137            0 :                 while (reception_is_paused_ && !shutdown_requested_)
     138              :                 {
     139            0 :                         usleep(10000);
     140              :                 }
     141              : 
     142            0 :                 TLOG(TLVL_DEBUG + 35) << "Received " << nfds << " events on token sockets";
     143            0 :                 for (auto n = 0; n < nfds; ++n)
     144              :                 {
     145            0 :                         if (receive_token_events_[n].data.fd == token_socket_)
     146              :                         {
     147            0 :                                 TLOG(TLVL_DEBUG + 32) << "Accepting new connection on token_socket";
     148              :                                 sockaddr_in addr;
     149            0 :                                 socklen_t arglen = sizeof(addr);
     150            0 :                                 auto conn_sock = accept(token_socket_, reinterpret_cast<struct sockaddr*>(&addr), &arglen);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     151            0 :                                 fcntl(conn_sock, F_SETFL, O_NONBLOCK);                                                       // set O_NONBLOCK
     152              : 
     153            0 :                                 if (conn_sock == -1)
     154              :                                 {
     155            0 :                                         TLOG(TLVL_ERROR) << "Error status received from accept, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
     156            0 :                                         perror("accept");
     157            0 :                                         exit(EXIT_FAILURE);
     158              :                                 }
     159              : 
     160            0 :                                 receive_token_addrs_[conn_sock] = std::string(inet_ntoa(addr.sin_addr));
     161            0 :                                 TLOG(TLVL_DEBUG + 32) << "New fd is " << conn_sock << " for data-receiver at " << receive_token_addrs_[conn_sock];
     162              :                                 struct epoll_event ev;
     163            0 :                                 ev.events = EPOLLIN;
     164            0 :                                 ev.data.fd = conn_sock;
     165            0 :                                 if (epoll_ctl(token_epoll_fd_, EPOLL_CTL_ADD, conn_sock, &ev) == -1)
     166              :                                 {
     167            0 :                                         TLOG(TLVL_ERROR) << "Error status received from epoll_ctl, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
     168            0 :                                         perror("epoll_ctl: conn_sock");
     169            0 :                                         exit(EXIT_FAILURE);
     170              :                                 }
     171              :                         }
     172            0 :                         else if ((receive_token_events_[n].events & EPOLLIN) != 0)
     173              :                         {
     174            0 :                                 auto startTime = artdaq::MonitoredQuantity::getCurrentTime();
     175              : 
     176              :                                 detail::RoutingToken buff;
     177            0 :                                 int sts = recv(receive_token_events_[n].data.fd, &buff, sizeof(detail::RoutingToken), MSG_WAITALL);
     178            0 :                                 if (sts == 0)
     179              :                                 {
     180            0 :                                         TLOG(TLVL_WARNING) << "Received 0-size token from " << receive_token_addrs_[receive_token_events_[n].data.fd] << ", closing socket";
     181            0 :                                         receive_token_addrs_.erase(receive_token_events_[n].data.fd);
     182            0 :                                         close(receive_token_events_[n].data.fd);
     183            0 :                                         epoll_ctl(token_epoll_fd_, EPOLL_CTL_DEL, receive_token_events_[n].data.fd, nullptr);
     184              :                                 }
     185            0 :                                 else if (sts < 0 && errno == EAGAIN)
     186              :                                 {
     187            0 :                                         TLOG(TLVL_DEBUG + 32) << "No more tokens from this rank. Continuing poll loop.";
     188            0 :                                 }
     189            0 :                                 else if (sts < 0)
     190              :                                 {
     191            0 :                                         TLOG(TLVL_ERROR) << "Error reading from token socket: sts=" << sts << ", errno=" << errno;
     192            0 :                                         receive_token_addrs_.erase(receive_token_events_[n].data.fd);
     193            0 :                                         close(receive_token_events_[n].data.fd);
     194            0 :                                         epoll_ctl(token_epoll_fd_, EPOLL_CTL_DEL, receive_token_events_[n].data.fd, nullptr);
     195              :                                 }
     196            0 :                                 else if (sts == sizeof(detail::RoutingToken) && buff.header != TOKEN_MAGIC)
     197              :                                 {
     198            0 :                                         TLOG(TLVL_ERROR) << "Received invalid token from " << receive_token_addrs_[receive_token_events_[n].data.fd] << " sts=" << sts;
     199            0 :                                 }
     200            0 :                                 else if (sts == sizeof(detail::RoutingToken))
     201              :                                 {
     202            0 :                                         TLOG(TLVL_DEBUG + 32) << "Received token from " << buff.rank << " indicating " << buff.new_slots_free << " slots are free. (run=" << buff.run_number << ")";
     203            0 :                                         if (buff.run_number != run_number_)
     204              :                                         {
     205            0 :                                                 TLOG(TLVL_DEBUG + 32) << "Received token from a different run number! Current = " << run_number_ << ", token = " << buff.run_number << ", ignoring (n=" << buff.new_slots_free << ")";
     206              :                                         }
     207              :                                         else
     208              :                                         {
     209            0 :                                                 received_token_count_ += buff.new_slots_free;
     210            0 :                                                 policy_->AddReceiverToken(buff.rank, buff.new_slots_free);
     211              :                                         }
     212              :                                 }
     213            0 :                                 auto delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
     214            0 :                                 if (statsHelperPtr_ != nullptr) { statsHelperPtr_->addSample(tokens_received_stat_key_, delta_time); }
     215              :                         }
     216              :                         else
     217              :                         {
     218            0 :                                 TLOG(TLVL_DEBUG + 32) << "Received event mask " << receive_token_events_[n].events << " from token fd " << receive_token_events_[n].data.fd;
     219              :                         }
     220              :                 }
     221              :         }
     222              : }
        

Generated by: LCOV version 2.0-1