LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - TokenSender.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 69 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 19 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"  // Before trace.h gets included in ConcurrentQueue (from GlobalQueue)
       2              : #define TRACE_NAME (app_name + "_TokenSender").c_str()
       3              : #include <dlfcn.h>
       4              : #include <chrono>
       5              : #include <cstring>
       6              : #include <fstream>
       7              : #include <iomanip>
       8              : #include <sstream>
       9              : #include <utility>
      10              : #include "artdaq/DAQrate/detail/TokenSender.hh"
      11              : 
      12              : #include "artdaq-core/Core/StatisticsCollection.hh"
      13              : #include "artdaq/DAQdata/TCPConnect.hh"
      14              : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
      15              : #include "cetlib_except/exception.h"
      16              : 
      17              : namespace artdaq {
      18            0 : TokenSender::TokenSender(const fhicl::ParameterSet& pset)
      19            0 :     : initialized_(false)
      20            0 :     , send_routing_tokens_(pset.get<bool>("use_routing_manager", false))
      21            0 :     , token_port_(pset.get<int>("routing_token_port", 35555))
      22            0 :     , token_socket_(-1)
      23            0 :     , token_address_(pset.get<std::string>("routing_manager_hostname", "localhost"))
      24            0 :     , tokens_sent_(0)
      25            0 :     , run_number_(0)
      26              : {
      27            0 :         TLOG(TLVL_DEBUG + 32) << "TokenSender CONSTRUCTOR";
      28              : 
      29            0 :         setup_tokens_();
      30            0 :         TLOG(TLVL_DEBUG + 35) << "artdaq::TokenSender::TokenSender ctor - reader_thread_ initialized";
      31            0 :         initialized_ = true;
      32            0 : }
      33              : 
      34            0 : TokenSender::~TokenSender()
      35              : {
      36            0 :         TLOG(TLVL_INFO) << "Shutting down TokenSender, token_socket_: " << token_socket_;
      37              : 
      38            0 :         if (token_socket_ != -1)
      39              :         {
      40            0 :                 if (shutdown(token_socket_, 2) != 0 && errno == ENOTSOCK)
      41              :                 {
      42            0 :                         TLOG(TLVL_ERROR) << "Shutdown of token_socket_ resulted in ENOTSOCK. NOT Closing file descriptor!";
      43              :                 }
      44              :                 else
      45              :                 {
      46            0 :                         close(token_socket_);
      47              :                 }
      48            0 :                 token_socket_ = -1;
      49              :         }
      50            0 : }
      51              : 
      52            0 : void TokenSender::setup_tokens_()
      53              : {
      54            0 :         if (send_routing_tokens_)
      55              :         {
      56            0 :                 TLOG(TLVL_DEBUG + 32) << "Creating Routing Token sending socket";
      57            0 :                 auto start_time = std::chrono::steady_clock::now();
      58            0 :                 while (token_socket_ < 0 && TimeUtils::GetElapsedTime(start_time) < 30)
      59              :                 {
      60            0 :                         token_socket_ = TCPConnect(token_address_.c_str(), token_port_, 0, sizeof(detail::RoutingToken));
      61            0 :                         if (token_socket_ < 0)
      62              :                         {
      63            0 :                                 TLOG(TLVL_DEBUG + 33) << "Waited " << TimeUtils::GetElapsedTime(start_time) << " s for Routing Manager to open token socket";
      64            0 :                                 usleep(100000);
      65              :                         }
      66              :                 }
      67            0 :                 if (token_socket_ < 0)
      68              :                 {
      69            0 :                         TLOG(TLVL_ERROR) << "I failed to create the socket for sending Routing Tokens! err=" << strerror(errno);
      70            0 :                         exit(1);
      71              :                 }
      72            0 :                 TLOG(TLVL_INFO) << "Routing Token sending socket created successfully for address " << token_address_;
      73              :         }
      74            0 : }
      75              : 
      76            0 : void TokenSender::send_routing_token_(int nSlots, int run_number, int rank)
      77              : {
      78            0 :         TLOG(TLVL_DEBUG + 33) << "send_routing_token_ called, send_routing_tokens_=" << std::boolalpha << send_routing_tokens_;
      79            0 :         if (!send_routing_tokens_)
      80              :         {
      81            0 :                 return;
      82              :         }
      83            0 :         if (token_socket_ == -1)
      84              :         {
      85            0 :                 setup_tokens_();
      86              :         }
      87              :         detail::RoutingToken token;
      88            0 :         token.header = TOKEN_MAGIC;
      89            0 :         token.rank = rank;
      90            0 :         token.new_slots_free = nSlots;
      91            0 :         token.run_number = run_number;
      92              : 
      93            0 :         TLOG(TLVL_DEBUG + 33) << "Sending RoutingToken to " << token_address_ << ":" << token_port_;
      94            0 :         size_t sts = 0;
      95            0 :         while (sts < sizeof(detail::RoutingToken))
      96              :         {
      97            0 :                 auto res = send(token_socket_, reinterpret_cast<uint8_t*>(&token) + sts, sizeof(detail::RoutingToken) - sts, 0);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
      98            0 :                 if (res < 0)
      99              :                 {
     100            0 :                         TLOG(TLVL_WARNING) << "Error on token_socket, reconnecting";
     101            0 :                         close(token_socket_);
     102            0 :                         token_socket_ = -1;
     103            0 :                         sts = 0;
     104            0 :                         setup_tokens_();
     105            0 :                         continue;
     106            0 :                 }
     107            0 :                 sts += res;
     108              :         }
     109            0 :         tokens_sent_ += nSlots;
     110            0 :         TLOG(TLVL_DEBUG + 33) << "Done sending RoutingToken to " << token_address_ << ":" << token_port_;
     111              : }
     112              : 
     113            0 : void TokenSender::SendRoutingToken(int nSlots, int run_number, int rank)
     114              : {
     115            0 :         while (!initialized_)
     116              :         {
     117            0 :                 usleep(1000);
     118              :         }
     119            0 :         if (!send_routing_tokens_)
     120              :         {
     121            0 :                 return;
     122              :         }
     123            0 :         boost::thread token([this, nSlots, run_number, rank] { send_routing_token_(nSlots, run_number, rank); });
     124            0 :         token.detach();
     125            0 :         usleep(0);  // Give up time slice
     126            0 : }
     127              : 
     128              : }  // namespace artdaq
        

Generated by: LCOV version 2.0-1