LCOV - code coverage report
Current view: top level - artdaq/DAQrate/detail - TableReceiver.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 214 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 50 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_TableReceiver").c_str()
       3              : #include "artdaq/DAQrate/detail/TableReceiver.hh"
       4              : 
       5              : #include "artdaq/DAQdata/TCPConnect.hh"
       6              : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
       7              : #include "canvas/Utilities/Exception.h"
       8              : 
       9              : #include <arpa/inet.h>
      10              : #include <netinet/in.h>
      11              : #include <poll.h>
      12              : #include <sys/socket.h>
      13              : #include <sys/types.h>
      14              : #include <chrono>
      15              : 
      16            0 : artdaq::TableReceiver::TableReceiver(const fhicl::ParameterSet& pset)
      17            0 :     : use_routing_manager_(pset.get<bool>("use_routing_manager", false))
      18            0 :     , should_stop_(false)
      19            0 :     , table_port_(pset.get<int>("table_update_port", 35556))
      20            0 :     , table_address_(pset.get<std::string>("routing_manager_hostname", "localhost"))
      21            0 :     , table_socket_(-1)
      22            0 :     , routing_table_last_(0)
      23            0 :     , routing_table_max_size_(pset.get<size_t>("routing_table_max_size", 1000))
      24            0 :     , routing_wait_time_(0)
      25            0 :     , routing_wait_time_count_(0)
      26            0 :     , routing_timeout_ms_((pset.get<size_t>("routing_timeout_ms", 1000)))
      27            0 :     , highest_sequence_id_routed_(0)
      28              : {
      29            0 :         TLOG(TLVL_DEBUG + 32) << "Received pset: " << pset.to_string();
      30              : 
      31            0 :         if (use_routing_manager_)
      32              :         {
      33            0 :                 startTableReceiverThread_();
      34              :         }
      35            0 : }
      36              : 
      37            0 : artdaq::TableReceiver::~TableReceiver()
      38              : {
      39            0 :         TLOG(TLVL_DEBUG + 32) << "Shutting down TableReceiver BEGIN";
      40            0 :         should_stop_ = true;
      41            0 :         disconnectFromRoutingManager_();
      42              : 
      43            0 :         if (routing_thread_ != nullptr)
      44              :         {
      45              :                 try
      46              :                 {
      47            0 :                         if (routing_thread_->joinable())
      48              :                         {
      49            0 :                                 routing_thread_->join();
      50              :                         }
      51              :                 }
      52            0 :                 catch (...)
      53              :                 {  // IGNORED
      54            0 :                 }
      55              :         }
      56            0 :         TLOG(TLVL_DEBUG + 32) << "Shutting down TableReceiver END.";
      57            0 : }
      58              : 
      59            0 : artdaq::TableReceiver::RoutingTable artdaq::TableReceiver::GetRoutingTable() const
      60              : {
      61            0 :         std::lock_guard<std::mutex> lk(routing_mutex_);
      62            0 :         RoutingTable routing_table_copy(routing_table_);
      63            0 :         return routing_table_copy;
      64            0 : }
      65              : 
      66            0 : artdaq::TableReceiver::RoutingTable artdaq::TableReceiver::GetAndClearRoutingTable()
      67              : {
      68            0 :         std::lock_guard<std::mutex> lk(routing_mutex_);
      69            0 :         RoutingTable routing_table_copy(routing_table_);
      70            0 :         routing_table_.clear();
      71            0 :         return routing_table_copy;
      72            0 : }
      73              : 
      74            0 : int artdaq::TableReceiver::GetRoutingTableEntry(artdaq::Fragment::sequence_id_t seqID)
      75              : {
      76            0 :         if (use_routing_manager_)
      77              :         {
      78            0 :                 sendTableUpdateRequest_(seqID);
      79            0 :                 auto routing_timeout_ms = routing_timeout_ms_;
      80            0 :                 if (routing_timeout_ms == 0)
      81              :                 {
      82            0 :                         routing_timeout_ms = 3600 * 1000;
      83              :                 }
      84            0 :                 auto condition_wait = routing_timeout_ms > 10 ? std::chrono::milliseconds(10) : std::chrono::milliseconds(routing_timeout_ms);
      85            0 :                 auto start_time = std::chrono::steady_clock::now();
      86            0 :                 while (!should_stop_ && TimeUtils::GetElapsedTimeMilliseconds(start_time) < routing_timeout_ms)
      87              :                 {
      88            0 :                         std::unique_lock<std::mutex> lk(routing_mutex_);
      89            0 :                         routing_cv_.wait_for(lk, condition_wait, [&]() { return routing_table_.count(seqID); });
      90            0 :                         if (routing_table_.count(seqID))
      91              :                         {
      92            0 :                                 routing_wait_time_.fetch_add(TimeUtils::GetElapsedTimeMicroseconds(start_time));
      93            0 :                                 return routing_table_.at(seqID);
      94              :                         }
      95            0 :                 }
      96            0 :                 TLOG(TLVL_WARNING) << "Bad Omen: Timeout receiving routing information for " << seqID
      97            0 :                                    << " in routing_timeout_ms (" << routing_timeout_ms_ << " ms)!";
      98              : 
      99            0 :                 routing_wait_time_.fetch_add(TimeUtils::GetElapsedTimeMicroseconds(start_time));
     100              :         }
     101            0 :         return ROUTING_FAILED;
     102              : }
     103              : 
     104            0 : void artdaq::TableReceiver::connectToRoutingManager_()
     105              : {
     106            0 :         auto start_time = std::chrono::steady_clock::now();
     107            0 :         while (table_socket_ < 0 && TimeUtils::GetElapsedTime(start_time) < 30)
     108              :         {
     109            0 :                 table_socket_ = TCPConnect(table_address_.c_str(), table_port_);
     110            0 :                 if (table_socket_ < 0)
     111              :                 {
     112            0 :                         TLOG(TLVL_DEBUG + 33) << "Waited " << TimeUtils::GetElapsedTime(start_time) << " s for Routing Manager to open table listen socket";
     113            0 :                         usleep(100000);
     114              :                 }
     115              :         }
     116            0 :         if (table_socket_ < 0)
     117              :         {
     118            0 :                 TLOG(TLVL_ERROR) << "Error creating socket for receiving table updates!";
     119            0 :                 exit(1);
     120              :         }
     121              : 
     122            0 :         detail::RoutingRequest startHdr(my_rank);
     123            0 :         write(table_socket_, &startHdr, sizeof(startHdr));
     124            0 : }
     125              : 
     126            0 : void artdaq::TableReceiver::disconnectFromRoutingManager_()
     127              : {
     128            0 :         detail::RoutingRequest endHdr(my_rank, detail::RoutingRequest::RequestMode::Disconnect);
     129            0 :         write(table_socket_, &endHdr, sizeof(endHdr));
     130            0 :         close(table_socket_);
     131            0 :         table_socket_ = -1;
     132            0 : }
     133              : 
     134            0 : void artdaq::TableReceiver::startTableReceiverThread_()
     135              : {
     136            0 :         if (routing_thread_ != nullptr && routing_thread_->joinable())
     137              :         {
     138            0 :                 routing_thread_->join();
     139              :         }
     140            0 :         TLOG(TLVL_INFO) << "Starting Routing Thread";
     141              :         try
     142              :         {
     143            0 :                 routing_thread_.reset(new boost::thread(&TableReceiver::receiveTableUpdatesLoop_, this));
     144              :                 char tname[16];
     145            0 :                 snprintf(tname, 16, "%s", "RoutingReceive");  // NOLINT
     146            0 :                 auto handle = routing_thread_->native_handle();
     147            0 :                 pthread_setname_np(handle, tname);
     148              :         }
     149            0 :         catch (const boost::exception& e)
     150              :         {
     151            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Routing Table Receive thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     152            0 :                 std::cerr << "Caught boost::exception starting Routing Table Receive thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     153            0 :                 exit(5);
     154            0 :         }
     155            0 : }
     156              : 
     157            0 : bool artdaq::TableReceiver::receiveTableUpdate_()
     158              : {
     159            0 :         TLOG(TLVL_DEBUG + 33) << __func__ << ": Polling table socket for new routes (address:port = " << table_address_ << ":" << table_port_ << ")";
     160            0 :         if (table_socket_ == -1)
     161              :         {
     162            0 :                 TLOG(TLVL_DEBUG + 32) << __func__ << ": Opening table socket";
     163            0 :                 connectToRoutingManager_();
     164              :         }
     165            0 :         if (table_socket_ == -1)
     166              :         {
     167            0 :                 TLOG(TLVL_DEBUG + 32) << __func__ << ": The table socket was not opened successfully.";
     168            0 :                 return false;
     169              :         }
     170              : 
     171              :         struct pollfd fd;
     172            0 :         fd.fd = table_socket_;
     173            0 :         fd.events = POLLIN | POLLPRI;
     174              : 
     175            0 :         auto res = poll(&fd, 1, 1000);
     176            0 :         if (res > 0)
     177              :         {
     178            0 :                 if (fd.revents & (POLLIN | POLLPRI))
     179              :                 {
     180            0 :                         TLOG(TLVL_DEBUG + 32) << __func__ << ": Going to receive RoutingPacketHeader";
     181            0 :                         artdaq::detail::RoutingPacketHeader hdr;
     182            0 :                         ssize_t stss = recv(table_socket_, &hdr, sizeof(hdr), MSG_WAITALL);
     183            0 :                         if (stss != sizeof(hdr))
     184              :                         {
     185            0 :                                 TLOG(TLVL_ERROR) << "Error reading Table Header from Table socket, errno=" << errno << " (" << strerror(errno) << ")";
     186            0 :                                 disconnectFromRoutingManager_();
     187            0 :                                 return false;
     188              :                         }
     189              : 
     190            0 :                         TLOG(TLVL_DEBUG + 32) << "receiveTableUpdatesLoop_: Checking for valid header with nEntries=" << hdr.nEntries << " header=" << std::hex << hdr.header;
     191            0 :                         if (hdr.header != ROUTING_MAGIC)
     192              :                         {
     193            0 :                                 TLOG(TLVL_DEBUG + 33) << __func__ << ": non-RoutingPacket received. No ROUTING_MAGIC.";
     194            0 :                                 return false;
     195              :                         }
     196            0 :                         if (hdr.nEntries == 0)
     197              :                         {
     198            0 :                                 TLOG(TLVL_DEBUG + 33) << __func__ << ": Empty Routing Table update received.";
     199            0 :                                 return false;
     200              :                         }
     201              : 
     202            0 :                         artdaq::detail::RoutingPacket buffer(hdr.nEntries);
     203            0 :                         size_t sts = 0;
     204            0 :                         size_t total = sizeof(artdaq::detail::RoutingPacketEntry) * hdr.nEntries;
     205            0 :                         while (sts < total)
     206              :                         {
     207            0 :                                 stss = read(table_socket_, reinterpret_cast<char*>(&buffer[0]) + sts, total - sts);
     208            0 :                                 sts += stss;
     209            0 :                                 TLOG(TLVL_DEBUG + 32) << "Read " << stss << " bytes, total " << sts << " / " << total;
     210            0 :                                 if (stss < 0)
     211              :                                 {
     212            0 :                                         TLOG(TLVL_ERROR) << "Error reading Table Data from Table socket, errno=" << errno << " (" << strerror(errno) << ")";
     213            0 :                                         disconnectFromRoutingManager_();
     214            0 :                                         return false;
     215              :                                 }
     216              :                         }
     217              : 
     218            0 :                         auto first = buffer.front().sequence_id;
     219            0 :                         auto last = buffer.back().sequence_id;
     220              : 
     221            0 :                         if (first + hdr.nEntries - 1 != last)
     222              :                         {
     223            0 :                                 TLOG(TLVL_ERROR) << __func__ << ": Skipping this RoutingPacket because the first (" << first << ") and last (" << last << ") entries are inconsistent (sz=" << hdr.nEntries << ")!";
     224            0 :                                 return false;
     225              :                         }
     226              : 
     227            0 :                         auto thisSeqID = first;
     228              : 
     229              :                         {
     230            0 :                                 std::lock_guard<std::mutex> lck(routing_mutex_);
     231            0 :                                 if (routing_table_.count(last) == 0)
     232              :                                 {
     233            0 :                                         for (auto entry : buffer)
     234              :                                         {
     235            0 :                                                 if (thisSeqID != entry.sequence_id)
     236              :                                                 {
     237            0 :                                                         TLOG(TLVL_ERROR) << __func__ << ": Aborting processing of this RoutingPacket because I encountered an inconsistent entry (seqid=" << entry.sequence_id << ", expected=" << thisSeqID << ")!";
     238            0 :                                                         last = thisSeqID - 1;
     239            0 :                                                         break;
     240              :                                                 }
     241            0 :                                                 thisSeqID++;
     242            0 :                                                 if (routing_table_.count(entry.sequence_id) != 0u)
     243              :                                                 {
     244            0 :                                                         if (routing_table_[entry.sequence_id] != entry.destination_rank)
     245              :                                                         {
     246            0 :                                                                 TLOG(TLVL_ERROR) << __func__ << ": Detected routing table corruption! Recevied update specifying that sequence ID " << entry.sequence_id
     247            0 :                                                                                  << " should go to rank " << entry.destination_rank << ", but I had already been told to send it to " << routing_table_[entry.sequence_id] << "!"
     248            0 :                                                                                  << " I will use the original value!";
     249              :                                                         }
     250            0 :                                                         continue;
     251            0 :                                                 }
     252            0 :                                                 if (entry.sequence_id < routing_table_last_)
     253              :                                                 {
     254            0 :                                                         continue;
     255              :                                                 }
     256            0 :                                                 routing_table_[entry.sequence_id] = entry.destination_rank;
     257            0 :                                                 TLOG(TLVL_DEBUG + 32) << __func__ << ": (my_rank=" << my_rank << ") received update: SeqID " << entry.sequence_id
     258            0 :                                                                       << " -> Rank " << entry.destination_rank;
     259              :                                         }
     260              :                                 }
     261              : 
     262            0 :                                 TLOG(TLVL_DEBUG + 32) << __func__ << ": There are now " << routing_table_.size() << " entries in the Routing Table";
     263            0 :                                 if (!routing_table_.empty())
     264              :                                 {
     265            0 :                                         TLOG(TLVL_DEBUG + 32) << __func__ << ": Last routing table entry is seqID=" << routing_table_.rbegin()->first;
     266              :                                 }
     267              : 
     268            0 :                                 auto counter = 0;
     269            0 :                                 for (auto& entry : routing_table_)
     270              :                                 {
     271            0 :                                         TLOG(TLVL_DEBUG + 40) << "Routing Table Entry" << counter << ": " << entry.first << " -> " << entry.second;
     272            0 :                                         counter++;
     273              :                                 }
     274            0 :                         }
     275            0 :                         routing_cv_.notify_all();
     276              : 
     277            0 :                         SendMetrics();
     278            0 :                         return true;
     279            0 :                 }
     280              :                 else
     281              :                 {
     282            0 :                         TLOG(TLVL_DEBUG + 32) << "Poll indicates socket closure. Disconnecting from Routing Manager";
     283            0 :                         disconnectFromRoutingManager_();
     284            0 :                         return false;
     285              :                 }
     286              :         }
     287            0 :         return false;
     288              : }
     289              : 
     290            0 : void artdaq::TableReceiver::receiveTableUpdatesLoop_()
     291              : {
     292              :         while (true)
     293              :         {
     294            0 :                 if (should_stop_)
     295              :                 {
     296            0 :                         TLOG(TLVL_DEBUG + 32) << __func__ << ": should_stop is " << std::boolalpha << should_stop_ << ", stopping";
     297            0 :                         disconnectFromRoutingManager_();
     298            0 :                         return;
     299              :                 }
     300              : 
     301            0 :                 receiveTableUpdate_();
     302            0 :         }
     303              : }
     304              : 
     305            0 : void artdaq::TableReceiver::sendTableUpdateRequest_(Fragment::sequence_id_t seq)
     306              : {
     307            0 :         TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ BEGIN";
     308              :         {
     309            0 :                 std::lock_guard<std::mutex> lck(routing_mutex_);
     310            0 :                 if (routing_table_.count(seq))
     311              :                 {
     312            0 :                         TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ END (no request sent): " << routing_table_.at(seq);
     313            0 :                         return;
     314              :                 }
     315            0 :         }
     316            0 :         if (table_socket_ == -1)
     317              :         {
     318            0 :                 connectToRoutingManager_();
     319              :         }
     320              : 
     321            0 :         TLOG(TLVL_DEBUG + 32) << "sendTableUpdateRequest_: Sending table update request for " << my_rank << ", sequence ID " << seq;
     322            0 :         detail::RoutingRequest pkt(my_rank, seq);
     323            0 :         write(table_socket_, &pkt, sizeof(pkt));
     324              : 
     325            0 :         TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ END";
     326              : }
     327              : 
     328            0 : size_t artdaq::TableReceiver::GetRoutingTableEntryCount() const
     329              : {
     330            0 :         std::lock_guard<std::mutex> lck(routing_mutex_);
     331            0 :         return routing_table_.size();
     332            0 : }
     333              : 
     334            0 : size_t artdaq::TableReceiver::GetRemainingRoutingTableEntries() const
     335              : {
     336            0 :         std::lock_guard<std::mutex> lck(routing_mutex_);
     337              :         // Find the distance from the next highest sequence ID to the end of the list
     338            0 :         size_t dist = std::distance(routing_table_.upper_bound(highest_sequence_id_routed_), routing_table_.end());
     339            0 :         return dist;  // If dist == 1, there is one entry left.
     340            0 : }
     341              : 
     342            0 : void artdaq::TableReceiver::RemoveRoutingTableEntry(Fragment::sequence_id_t seq)
     343              : {
     344            0 :         TLOG(TLVL_DEBUG + 35) << "RemoveRoutingTableEntry: Removing sequence ID " << seq << " from routing table.";
     345            0 :         std::lock_guard<std::mutex> lck(routing_mutex_);
     346              :         //      while (routing_table_.size() > routing_table_max_size_)
     347              :         //      {
     348              :         //              routing_table_.erase(routing_table_.begin());
     349              :         //      }
     350            0 :         if (routing_table_.find(seq) != routing_table_.end())
     351              :         {
     352            0 :                 routing_table_.erase(routing_table_.find(seq));
     353              :         }
     354            0 : }
     355              : 
     356            0 : void artdaq::TableReceiver::SendMetrics() const
     357              : {
     358            0 :         if (metricMan)
     359              :         {
     360            0 :                 TLOG(TLVL_DEBUG + 34) << "sending metrics";
     361            0 :                 if (use_routing_manager_)
     362              :                 {
     363            0 :                         metricMan->sendMetric("Routing Table Size", GetRoutingTableEntryCount(), "events", 2, MetricMode::LastPoint);
     364            0 :                         if (routing_wait_time_ > 0)
     365              :                         {
     366            0 :                                 metricMan->sendMetric("Routing Wait Time", static_cast<double>(routing_wait_time_.load()) / 1000000, "s", 2, MetricMode::Average);
     367            0 :                                 routing_wait_time_ = 0;
     368              :                         }
     369              :                 }
     370              :         }
     371            0 : }
        

Generated by: LCOV version 2.0-1