LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - RequestReceiver.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 162 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 33 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_RequestReceiver").c_str()
       3              : 
       4              : #include "artdaq/DAQdata/Globals.hh"
       5              : #include "artdaq/DAQrate/detail/RequestMessage.hh"
       6              : #include "artdaq/DAQrate/detail/RequestReceiver.hh"
       7              : 
       8              : #include <boost/exception/all.hpp>
       9              : #include <boost/throw_exception.hpp>
      10              : 
      11              : #include <iterator>
      12              : #include <limits>
      13              : 
      14              : #include "canvas/Utilities/Exception.h"
      15              : #include "cetlib_except/exception.h"
      16              : #include "fhiclcpp/ParameterSet.h"
      17              : 
      18              : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
      19              : #include "artdaq-core/Data/Fragment.hh"
      20              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
      21              : #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
      22              : #include "artdaq-core/Utilities/TimeUtils.hh"
      23              : 
      24              : #include <arpa/inet.h>
      25              : #include <netinet/in.h>
      26              : #include <sys/poll.h>
      27              : #include <algorithm>
      28              : #include <fstream>
      29              : #include <iomanip>
      30              : #include <iostream>
      31              : #include <iterator>
      32              : #include "artdaq/DAQdata/TCPConnect.hh"
      33              : 
      34            0 : artdaq::RequestReceiver::RequestReceiver()
      35            0 :     : request_stop_requested_(false)
      36            0 :     , requests_received_(0)
      37            0 :     , should_stop_(false)
      38            0 :     , request_addr_("227.128.12.26")
      39            0 :     , receive_requests_(false)
      40            0 : {}
      41              : 
      42            0 : artdaq::RequestReceiver::RequestReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RequestBuffer> output_buffer)
      43            0 :     : request_stop_requested_(false)
      44            0 :     , requests_received_(0)
      45            0 :     , should_stop_(false)
      46            0 :     , request_port_(ps.get<int>("request_port", 3001))
      47            0 :     , request_addr_(ps.get<std::string>("request_address", "227.128.12.26"))
      48            0 :     , multicast_in_addr_(ps.get<std::string>("multicast_interface_ip", "0.0.0.0"))
      49            0 :     , receive_requests_(ps.get<bool>("receive_requests", false))
      50            0 :     , end_of_run_timeout_ms_(ps.get<size_t>("end_of_run_quiet_timeout_ms", 1000))
      51            0 :     , requests_(output_buffer)
      52              : {
      53            0 :         TLOG(TLVL_DEBUG + 32) << "RequestReceiver CONSTRUCTOR ps: " << ps.to_string();
      54            0 :         if (receive_requests_)
      55              :         {
      56            0 :                 setupRequestListener();
      57              :         }
      58            0 : }
      59              : 
      60            0 : void artdaq::RequestReceiver::setupRequestListener()
      61              : {
      62            0 :         TLOG(TLVL_INFO) << "Setting up request listen socket, rank=" << my_rank << ", address=" << request_addr_ << ":" << request_port_
      63            0 :                         << ", multicast interface=" << multicast_in_addr_;
      64            0 :         request_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
      65            0 :         if (request_socket_ < 0)
      66              :         {
      67            0 :                 TLOG(TLVL_ERROR) << "Error creating socket for receiving data requests! err=" << strerror(errno);
      68            0 :                 exit(1);
      69              :         }
      70              : 
      71              :         struct sockaddr_in si_me_request;
      72              : 
      73            0 :         int yes = 1;
      74            0 :         if (setsockopt(request_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
      75              :         {
      76            0 :                 TLOG(TLVL_ERROR) << "Unable to enable port reuse on request socket, err=" << strerror(errno);
      77            0 :                 exit(1);
      78              :         }
      79            0 :         memset(&si_me_request, 0, sizeof(si_me_request));
      80            0 :         si_me_request.sin_family = AF_INET;
      81            0 :         si_me_request.sin_port = htons(request_port_);
      82            0 :         si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
      83            0 :         if (bind(request_socket_, reinterpret_cast<struct sockaddr*>(&si_me_request), sizeof(si_me_request)) == -1)  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
      84              :         {
      85            0 :                 TLOG(TLVL_ERROR) << "Cannot bind request socket to port " << request_port_ << ", err=" << strerror(errno);
      86            0 :                 exit(1);
      87              :         }
      88              : 
      89            0 :         if (request_addr_ != "localhost")
      90              :         {
      91              :                 struct ip_mreq mreq;
      92            0 :                 int sts = ResolveHost(request_addr_.c_str(), mreq.imr_multiaddr);
      93            0 :                 if (sts == -1)
      94              :                 {
      95            0 :                         TLOG(TLVL_ERROR) << "Unable to resolve multicast request address, err=" << strerror(errno);
      96            0 :                         exit(1);
      97              :                 }
      98            0 :                 sts = GetInterfaceForNetwork(multicast_in_addr_.c_str(), mreq.imr_interface);
      99            0 :                 if (sts == -1)
     100              :                 {
     101            0 :                         TLOG(TLVL_ERROR) << "Unable to determine the multicast network interface for " << multicast_in_addr_;
     102            0 :                         exit(1);
     103              :                 }
     104              :                 char addr_str[INET_ADDRSTRLEN];
     105            0 :                 inet_ntop(AF_INET, &(mreq.imr_interface), addr_str, INET_ADDRSTRLEN);
     106            0 :                 TLOG(TLVL_INFO) << "Successfully determined the multicast network interface for " << multicast_in_addr_ << ": " << addr_str << " (RequestReceiver)";
     107            0 :                 if (setsockopt(request_socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
     108              :                 {
     109            0 :                         TLOG(TLVL_ERROR) << "Unable to join multicast group, err=" << strerror(errno);
     110            0 :                         exit(1);
     111              :                 }
     112              :         }
     113            0 :         TLOG(TLVL_INFO) << "Done setting up request socket, rank=" << my_rank;
     114            0 : }
     115              : 
     116            0 : artdaq::RequestReceiver::~RequestReceiver()
     117              : {
     118            0 :         stopRequestReception(true);
     119            0 : }
     120              : 
     121            0 : void artdaq::RequestReceiver::stopRequestReception(bool force)
     122              : {
     123            0 :         std::unique_lock<std::mutex> lk(state_mutex_);
     124            0 :         if (!receive_requests_) return;
     125            0 :         if (requests_received_ == 0 && !force)
     126              :         {
     127            0 :                 TLOG(TLVL_ERROR) << "Stop request received by RequestReceiver, but no requests have ever been received." << std::endl
     128            0 :                                  << "Check that UDP port " << request_port_ << " is open in the firewall config.";
     129              :         }
     130            0 :         should_stop_ = true;
     131            0 :         if (running_)
     132              :         {
     133            0 :                 TLOG(TLVL_DEBUG + 32) << "Joining requestThread";
     134              :                 try
     135              :                 {
     136            0 :                         if (requestThread_.joinable())
     137              :                         {
     138            0 :                                 requestThread_.join();
     139              :                         }
     140              :                 }
     141            0 :                 catch (...)
     142              :                 {
     143              :                         // IGNORED
     144            0 :                 }
     145            0 :                 bool once = true;
     146            0 :                 while (running_)
     147              :                 {
     148            0 :                         if (once)
     149              :                         {
     150            0 :                                 TLOG(TLVL_ERROR) << "running_ is true after thread join! Should NOT happen";
     151              :                         }
     152            0 :                         once = false;
     153            0 :                         usleep(10000);
     154              :                 }
     155              :         }
     156              : 
     157            0 :         if (request_socket_ != -1)
     158              :         {
     159            0 :                 close(request_socket_);
     160            0 :                 request_socket_ = -1;
     161              :         }
     162            0 :         TLOG(TLVL_INFO) << "RequestReceiver stopped, received " << requests_received_ << " request messages";
     163            0 :         requests_received_ = 0;
     164            0 : }
     165              : 
     166            0 : void artdaq::RequestReceiver::startRequestReception()
     167              : {
     168            0 :         if (!receive_requests_) return;
     169            0 :         std::unique_lock<std::mutex> lk(state_mutex_);
     170            0 :         if (requestThread_.joinable())
     171              :         {
     172            0 :                 requestThread_.join();
     173              :         }
     174            0 :         should_stop_ = false;
     175            0 :         request_stop_requested_ = false;
     176              : 
     177            0 :         if (request_socket_ == -1)
     178              :         {
     179            0 :                 TLOG(TLVL_INFO) << "Connecting Request Reception socket";
     180            0 :                 setupRequestListener();
     181              :         }
     182              : 
     183            0 :         TLOG(TLVL_INFO) << "Starting Request Reception Thread";
     184              :         try
     185              :         {
     186            0 :                 requestThread_ = boost::thread(&RequestReceiver::receiveRequestsLoop, this);
     187              :                 char tname[16];                                             // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     188            0 :                 snprintf(tname, sizeof(tname) - 1, "%d-ReqRecv", my_rank);  // NOLINT
     189            0 :                 tname[sizeof(tname) - 1] = '\0';                            // assure term. snprintf is not too evil :)
     190            0 :                 auto handle = requestThread_.native_handle();
     191            0 :                 pthread_setname_np(handle, tname);
     192              :         }
     193            0 :         catch (const boost::exception& e)
     194              :         {
     195            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Request Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     196            0 :                 std::cerr << "Caught boost::exception starting Request Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     197            0 :                 exit(5);
     198            0 :         }
     199            0 : }
     200              : 
     201            0 : void artdaq::RequestReceiver::receiveRequestsLoop()
     202              : {
     203            0 :         running_ = true;
     204            0 :         requests_->reset();
     205            0 :         requests_->setRunning(true);
     206            0 :         while (!should_stop_)
     207              :         {
     208            0 :                 TLOG(TLVL_DEBUG + 35) << "receiveRequestsLoop: Polling Request socket for new requests";
     209              : 
     210            0 :                 if (request_socket_ == -1)
     211              :                 {
     212            0 :                         setupRequestListener();
     213              :                 }
     214              : 
     215            0 :                 int ms_to_wait = 10;
     216              :                 struct pollfd ufds[1];
     217            0 :                 ufds[0].fd = request_socket_;
     218            0 :                 ufds[0].events = POLLIN | POLLPRI | POLLERR;
     219            0 :                 int rv = poll(ufds, 1, ms_to_wait);
     220              : 
     221              :                 // Continue loop if no message received or message does not have correct event ID
     222            0 :                 if (rv <= 0 || (ufds[0].revents != POLLIN && ufds[0].revents != POLLPRI))
     223              :                 {
     224            0 :                         if (rv == 1 && ((ufds[0].revents & (POLLNVAL | POLLERR | POLLHUP)) != 0))
     225              :                         {
     226            0 :                                 close(request_socket_);
     227            0 :                                 request_socket_ = -1;
     228              :                         }
     229            0 :                         if (request_stop_requested_ && TimeUtils::GetElapsedTimeMilliseconds(request_stop_timeout_) > end_of_run_timeout_ms_)
     230              :                         {
     231            0 :                                 break;
     232              :                         }
     233            0 :                         continue;
     234              :                 }
     235              : 
     236            0 :                 TLOG(TLVL_DEBUG + 34) << "Received packet on Request channel";
     237            0 :                 std::vector<uint8_t> buffer(MAX_REQUEST_MESSAGE_SIZE);
     238              :                 struct sockaddr_in from;
     239            0 :                 socklen_t len = sizeof(from);
     240            0 :                 auto sts = recvfrom(request_socket_, &buffer[0], MAX_REQUEST_MESSAGE_SIZE, 0, reinterpret_cast<struct sockaddr*>(&from), &len);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     241            0 :                 if (sts < 0)
     242              :                 {
     243            0 :                         TLOG(TLVL_ERROR) << "Error receiving request message header err=" << strerror(errno);
     244            0 :                         close(request_socket_);
     245            0 :                         request_socket_ = -1;
     246            0 :                         continue;
     247            0 :                 }
     248              : 
     249            0 :                 auto hdr_buffer = reinterpret_cast<artdaq::detail::RequestHeader*>(&buffer[0]);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     250            0 :                 TLOG(TLVL_DEBUG + 34) << "Request header word: 0x" << std::hex << hdr_buffer->header << std::dec << ", packet_count: " << hdr_buffer->packet_count << " from rank " << hdr_buffer->rank << ", " << inet_ntoa(from.sin_addr) << ":" << from.sin_port << ", run number: " << hdr_buffer->run_number;
     251            0 :                 if (!hdr_buffer->isValid())
     252              :                 {
     253            0 :                         continue;
     254              :                 }
     255              : 
     256              :                 // 19-Dec-2018, KAB: added check on current run number
     257            0 :                 if (run_number_ != 0 && hdr_buffer->run_number != run_number_)
     258              :                 {
     259            0 :                         TLOG(TLVL_WARNING) << "Received a Request Message with the wrong run number ("
     260            0 :                                            << hdr_buffer->run_number << "), expected " << run_number_
     261            0 :                                            << ", ignoring this request.";
     262            0 :                         continue;
     263            0 :                 }
     264              : 
     265            0 :                 requests_received_++;
     266              : 
     267            0 :                 if (hdr_buffer->mode == artdaq::detail::RequestMessageMode::EndOfRun)
     268              :                 {
     269            0 :                         TLOG(TLVL_INFO) << "Received Request Message with the EndOfRun marker. (Re)Starting 1-second timeout for receiving all outstanding requests...";
     270            0 :                         request_stop_timeout_ = std::chrono::steady_clock::now();
     271            0 :                         request_stop_requested_ = true;
     272              :                 }
     273              : 
     274            0 :                 std::vector<artdaq::detail::RequestPacket> pkt_buffer(hdr_buffer->packet_count);
     275            0 :                 memcpy(&pkt_buffer[0], &buffer[sizeof(artdaq::detail::RequestHeader)], sizeof(artdaq::detail::RequestPacket) * hdr_buffer->packet_count);
     276              : 
     277            0 :                 if (should_stop_)
     278              :                 {
     279            0 :                         break;
     280              :                 }
     281              : 
     282            0 :                 for (auto& buffer : pkt_buffer)
     283              :                 {
     284            0 :                         TLOG(TLVL_DEBUG + 36) << "Request Packet: hdr=" << /*std::dec <<*/ buffer.header << ", seq=" << buffer.sequence_id << ", ts=" << buffer.timestamp;
     285            0 :                         if (!buffer.isValid()) continue;
     286            0 :                         requests_->push(buffer.sequence_id, buffer.timestamp);
     287              :                 }
     288            0 :         }
     289            0 :         TLOG(TLVL_DEBUG + 32) << "Ending Request Thread";
     290            0 :         running_ = false;
     291            0 :         requests_->setRunning(false);
     292            0 : }
        

Generated by: LCOV version 2.0-1