LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - RequestSender.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 75.5 % 139 105
Test Date: 2025-09-04 00:45:34 Functions: 66.7 % 33 22

            Line data    Source code
       1              : #include "TRACE/tracemf.h"
       2              : #include "artdaq/DAQdata/Globals.hh"  // Before trace.h gets included in ConcurrentQueue (from GlobalQueue)
       3              : #define TRACE_NAME (app_name + "_RequestSender").c_str()
       4              : #include "artdaq/DAQrate/detail/RequestSender.hh"
       5              : 
       6              : #include "artdaq/DAQdata/TCPConnect.hh"
       7              : 
       8              : #include "fhiclcpp/ParameterSet.h"
       9              : 
      10              : #include <boost/thread.hpp>
      11              : 
      12              : #include <dlfcn.h>
      13              : #include <chrono>
      14              : #include <cstring>
      15              : #include <fstream>
      16              : #include <iomanip>
      17              : #include <mutex>
      18              : #include <sstream>
      19              : #include <thread>
      20              : #include <utility>
      21              : 
      22              : namespace artdaq {
      23           19 : RequestSender::RequestSender(const fhicl::ParameterSet& pset)
      24           19 :     : send_requests_(pset.get<bool>("send_requests", false))
      25           19 :     , initialized_(false)
      26           57 :     , request_address_(pset.get<std::string>("request_address", "227.128.12.26"))
      27           38 :     , request_port_(pset.get<int>("request_port", 3001))
      28           38 :     , request_delay_(pset.get<size_t>("request_delay_ms", 0) * 1000)
      29           38 :     , request_shutdown_timeout_us_(pset.get<size_t>("request_shutdown_timeout_us", 100000))
      30           19 :     , request_socket_(-1)
      31           95 :     , multicast_out_addr_(pset.get<std::string>("multicast_interface_ip", pset.get<std::string>("output_address", "0.0.0.0")))
      32           19 :     , request_mode_(detail::RequestMessageMode::Normal)
      33           38 :     , min_request_interval_ms_(pset.get<size_t>("min_request_interval_ms", 100))
      34           19 :     , request_sending_(0)
      35           19 :     , requests_sent_(0)
      36           57 :     , run_number_(0)
      37              : {
      38           57 :         TLOG(TLVL_DEBUG) << "RequestSender CONSTRUCTOR pset=" << pset.to_string();
      39           19 :         setup_requests_();
      40              : 
      41           38 :         TLOG(TLVL_DEBUG + 35) << "artdaq::RequestSender::RequestSender ctor - reader_thread_ initialized";
      42           19 :         initialized_ = true;
      43           19 : }
      44              : 
      45           36 : RequestSender::~RequestSender()
      46              : {
      47           57 :         TLOG(TLVL_INFO) << "Shutting down RequestSender: Waiting for " << request_sending_.load() << " requests to be sent (total sent: " << requests_sent_ << ")";
      48              : 
      49           19 :         auto start_time = std::chrono::steady_clock::now();
      50              : 
      51           38 :         while (request_sending_.load() > 0 && request_shutdown_timeout_us_ + request_delay_ > TimeUtils::GetElapsedTimeMicroseconds(start_time))
      52              :         {
      53            0 :                 usleep(1000);
      54              :         }
      55              :         {
      56           19 :                 std::lock_guard<std::mutex> lk(request_mutex_);
      57           19 :                 std::lock_guard<std::mutex> lk2(request_send_mutex_);
      58           19 :         }
      59           57 :         TLOG(TLVL_INFO) << "Shutting down RequestSender: request_socket_: " << request_socket_;
      60           19 :         if (request_socket_ != -1)
      61              :         {
      62            1 :                 if (shutdown(request_socket_, 2) != 0 && errno == ENOTSOCK)
      63              :                 {
      64            0 :                         TLOG(TLVL_ERROR) << "Shutdown of request_socket_ resulted in ENOTSOCK. NOT Closing file descriptor!";
      65              :                 }
      66              :                 else
      67              :                 {
      68            1 :                         close(request_socket_);
      69              :                 }
      70            1 :                 request_socket_ = -1;
      71              :         }
      72           36 : }
      73              : 
      74            3 : void RequestSender::SetRequestMode(detail::RequestMessageMode mode)
      75              : {
      76              :         {
      77            3 :                 std::lock_guard<std::mutex> lk(request_mutex_);
      78            3 :                 request_mode_ = mode;
      79            3 :         }
      80            3 :         SendRequest(true);
      81            3 : }
      82              : 
      83           19 : void RequestSender::setup_requests_()
      84              : {
      85           19 :         if (send_requests_)
      86              :         {
      87            1 :                 request_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
      88            1 :                 if (request_socket_ < 0)
      89              :                 {
      90            0 :                         TLOG(TLVL_ERROR) << "I failed to create the socket for sending Data Requests! err=" << strerror(errno);
      91            0 :                         exit(1);
      92              :                 }
      93            1 :                 int sts = ResolveHost(request_address_.c_str(), request_port_, request_addr_);
      94            1 :                 if (sts == -1)
      95              :                 {
      96            0 :                         TLOG(TLVL_ERROR) << "Unable to resolve Data Request address, err=" << strerror(errno);
      97            0 :                         exit(1);
      98              :                 }
      99              : 
     100              :                 /*              if (multicast_out_addr_ == "0.0.0.0")
     101              :                 {
     102              :                     char hostname[HOST_NAME_MAX];
     103              :                     sts = gethostname(hostname, HOST_NAME_MAX);
     104              :                     multicast_out_addr_ = std::string(hostname);
     105              :                     if (sts < 0)
     106              :                     {
     107              :                         TLOG(TLVL_ERROR) << "Could not get current hostname,  err=" << strerror(errno);
     108              :                         exit(1);
     109              :                     }
     110              :                 }*/
     111              : 
     112              :                 // For 0.0.0.0, use system-specified IP_MULTICAST_IF
     113            1 :                 if (multicast_out_addr_ != "localhost" && multicast_out_addr_ != "0.0.0.0")
     114              :                 {
     115              :                         struct in_addr addr;
     116            0 :                         sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), addr);
     117              :                         // sts = ResolveHost(multicast_out_addr_.c_str(), addr);
     118            0 :                         if (sts == -1)
     119              :                         {
     120            0 :                                 TLOG(TLVL_ERROR) << "Unable to determine the  multicast interface address for " << multicast_out_addr_ << ", err=" << strerror(errno);
     121            0 :                                 exit(1);
     122              :                         }
     123              :                         char addr_str[INET_ADDRSTRLEN];
     124            0 :                         inet_ntop(AF_INET, &(addr), addr_str, INET_ADDRSTRLEN);
     125            0 :                         TLOG(TLVL_INFO) << "Successfully determined the multicast network interface for " << multicast_out_addr_ << ": " << addr_str;
     126              : 
     127            0 :                         if (setsockopt(request_socket_, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == -1)
     128              :                         {
     129            0 :                                 TLOG(TLVL_ERROR) << "Cannot set outgoing interface, err=" << strerror(errno);
     130            0 :                                 exit(1);
     131              :                         }
     132              :                 }
     133            1 :                 int yes = 1;
     134            1 :                 if (setsockopt(request_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
     135              :                 {
     136            0 :                         TLOG(TLVL_ERROR) << "Unable to enable port reuse on request socket, err=" << strerror(errno);
     137            0 :                         exit(1);
     138              :                 }
     139            1 :                 if (setsockopt(request_socket_, IPPROTO_IP, IP_MULTICAST_LOOP, &yes, sizeof(yes)) < 0)
     140              :                 {
     141            0 :                         TLOG(TLVL_ERROR) << "Unable to enable multicast loopback on request socket, err=" << strerror(errno);
     142            0 :                         exit(1);
     143              :                 }
     144            1 :                 if (setsockopt(request_socket_, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes)) == -1)
     145              :                 {
     146            0 :                         TLOG(TLVL_ERROR) << "Cannot set request socket to broadcast, err=" << strerror(errno);
     147            0 :                         exit(1);
     148              :                 }
     149              :         }
     150           19 : }
     151              : 
     152            4 : void RequestSender::do_send_request_()
     153              : {
     154            4 :         if (!send_requests_)
     155              :         {
     156            0 :                 request_sending_--;
     157            0 :                 return;
     158              :         }
     159            4 :         if (request_socket_ == -1)
     160              :         {
     161            0 :                 setup_requests_();
     162              :         }
     163              : 
     164            8 :         TLOG(TLVL_DEBUG + 33) << "Waiting for " << request_delay_ << " microseconds.";
     165            4 :         std::this_thread::sleep_for(std::chrono::microseconds(request_delay_));
     166              : 
     167            8 :         TLOG(TLVL_DEBUG + 33) << "Creating RequestMessage";
     168            4 :         detail::RequestMessage message;
     169            4 :         message.setRank(my_rank);
     170            4 :         message.setRunNumber(run_number_);
     171              :         {
     172            4 :                 std::lock_guard<std::mutex> lk(request_mutex_);
     173           10 :                 for (auto& req : active_requests_)
     174              :                 {
     175           12 :                         TLOG(TLVL_DEBUG + 36) << "Adding a request with sequence ID " << req.first << ", timestamp " << req.second << " to request message";
     176            6 :                         message.addRequest(req.first, req.second);
     177              :                 }
     178            8 :                 TLOG(TLVL_DEBUG + 33) << "Setting mode flag in Message Header to " << static_cast<int>(request_mode_);
     179            4 :                 message.setMode(request_mode_);
     180            4 :         }
     181              :         char str[INET_ADDRSTRLEN];
     182            4 :         inet_ntop(AF_INET, &(request_addr_.sin_addr), str, INET_ADDRSTRLEN);
     183            4 :         std::lock_guard<std::mutex> lk2(request_send_mutex_);
     184            8 :         TLOG(TLVL_DEBUG + 33) << "Sending request for " << message.size() << " events to multicast group " << str
     185            4 :                               << ", port " << request_port_ << ", interface " << multicast_out_addr_;
     186            4 :         auto buf = message.GetMessage();
     187            4 :         auto sts = sendto(request_socket_, &buf[0], buf.size(), 0, reinterpret_cast<struct sockaddr*>(&request_addr_), sizeof(request_addr_));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     188            4 :         if (sts < 0 || static_cast<size_t>(sts) != buf.size())
     189              :         {
     190            0 :                 TLOG(TLVL_ERROR) << "Error sending request message err=" << strerror(errno) << "sts=" << sts;
     191            0 :                 request_socket_ = -1;
     192            0 :                 request_sending_--;
     193            0 :                 return;
     194              :         }
     195            8 :         TLOG(TLVL_DEBUG + 33) << "Done sending request sts=" << sts;
     196            4 :         request_sending_--;
     197            4 :         requests_sent_++;
     198            4 : }
     199              : 
     200         2540 : void RequestSender::SendRequest(bool endOfRunOnly)
     201              : {
     202         2540 :         while (!initialized_)
     203              :         {
     204            0 :                 usleep(1000);
     205              :         }
     206              : 
     207         2540 :         if (!send_requests_)
     208              :         {
     209         2536 :                 return;
     210              :         }
     211              :         {
     212            4 :                 std::lock_guard<std::mutex> lk(request_mutex_);
     213            4 :                 if (endOfRunOnly && request_mode_ != detail::RequestMessageMode::EndOfRun)
     214              :                 {
     215            0 :                         return;
     216              :                 }
     217            4 :         }
     218            4 :         last_request_send_time_ = std::chrono::steady_clock::now();
     219            4 :         request_sending_++;
     220            8 :         boost::thread request([this] { do_send_request_(); });
     221            4 :         request.detach();
     222            4 : }
     223              : 
     224          446 : void RequestSender::AddRequest(Fragment::sequence_id_t seqID, Fragment::timestamp_t timestamp)
     225              : {
     226          446 :         while (!initialized_)
     227              :         {
     228            0 :                 usleep(1000);
     229              :         }
     230              : 
     231              :         {
     232          446 :                 std::lock_guard<std::mutex> lk(request_mutex_);
     233          446 :                 if (active_requests_.count(seqID) == 0u)
     234              :                 {
     235          892 :                         TLOG(TLVL_DEBUG + 37) << "Adding request for sequence ID " << seqID << " and timestamp " << timestamp << " to request list.";
     236          446 :                         active_requests_[seqID] = timestamp;
     237              :                 }
     238              : 
     239          446 :                 while (active_requests_.size() > detail::RequestMessage::max_request_count())
     240              :                 {
     241            0 :                         TLOG(TLVL_WARNING) << "Erasing request with seqID " << active_requests_.begin()->first << " due to over-large request list size! (" << active_requests_.size() << " / " << detail::RequestMessage::max_request_count() << ")";
     242            0 :                         active_requests_.erase(active_requests_.begin());
     243              :                 }
     244          446 :         }
     245          446 :         SendRequest(TimeUtils::GetElapsedTimeMilliseconds(last_request_send_time_) < min_request_interval_ms_);
     246          446 : }
     247              : 
     248          441 : void RequestSender::RemoveRequest(Fragment::sequence_id_t seqID)
     249              : {
     250          441 :         while (!initialized_)
     251              :         {
     252            0 :                 usleep(1000);
     253              :         }
     254          441 :         std::lock_guard<std::mutex> lk(request_mutex_);
     255          882 :         TLOG(TLVL_DEBUG + 38) << "Removing request for sequence ID " << seqID << " from request list.";
     256          441 :         active_requests_.erase(seqID);
     257          441 : }
     258              : }  // namespace artdaq
        

Generated by: LCOV version 2.0-1