LCOV - code coverage report
Current view: top level - artdaq/DAQrate - RequestBuffer.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 70.7 % 99 70
Test Date: 2025-09-04 00:45:34 Functions: 70.0 % 20 14

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_RequestBuffer").c_str()  // include these 2 first -
       3              : 
       4              : #include "artdaq/DAQrate/RequestBuffer.hh"
       5              : 
       6           27 : artdaq::RequestBuffer::RequestBuffer(artdaq::Fragment::sequence_id_t request_increment)
       7              : 
       8           27 :     : requests_()
       9           27 :     , request_timing_()
      10           27 :     , highest_seen_request_(0)
      11           27 :     , last_next_request_(0)
      12           27 :     , out_of_order_requests_()
      13           27 :     , request_increment_(request_increment)
      14           54 :     , receiver_running_(false)
      15              : {
      16           27 : }
      17              : 
      18           27 : artdaq::RequestBuffer::~RequestBuffer() {}
      19              : 
      20       400062 : void artdaq::RequestBuffer::push(artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts)
      21              : {
      22       400062 :         std::lock_guard<std::mutex> tlk(request_mutex_);
      23       400062 :         if (requests_.count(seq) && requests_[seq] != ts)
      24              :         {
      25            0 :                 TLOG(TLVL_ERROR) << "Received conflicting request for SeqID "
      26            0 :                                  << seq << "!"
      27            0 :                                  << " Old ts=" << requests_[seq]
      28            0 :                                  << ", new ts=" << ts << ". Keeping OLD!";
      29              :         }
      30       400062 :         else if (!requests_.count(seq))
      31              :         {
      32       400062 :                 int delta = seq - highest_seen_request_;
      33       800124 :                 TLOG(TLVL_DEBUG + 36) << "Received request for sequence ID " << seq
      34       400062 :                                       << " and timestamp " << ts << " (delta: " << delta << ")";
      35       400062 :                 if (delta <= 0 || out_of_order_requests_.count(seq))
      36              :                 {
      37            0 :                         TLOG(TLVL_DEBUG + 36) << "Already serviced this request ( sequence ID " << seq << ")! Ignoring...";
      38              :                 }
      39              :                 else
      40              :                 {
      41       400062 :                         requests_[seq] = ts;
      42       400062 :                         request_timing_[seq] = std::chrono::steady_clock::now();
      43              :                 }
      44              :         }
      45       400062 :         request_cv_.notify_all();
      46       400062 : }
      47              : 
      48            1 : void artdaq::RequestBuffer::reset()
      49              : {
      50            1 :         std::lock_guard<std::mutex> lk(request_mutex_);
      51            1 :         requests_.clear();
      52            1 :         request_timing_.clear();
      53            1 :         highest_seen_request_ = 0;
      54            1 :         last_next_request_ = 0;
      55            1 :         out_of_order_requests_.clear();
      56            1 : }
      57              : 
      58              : /// <summary>
      59              : /// Get the current requests
      60              : /// </summary>
      61              : /// <returns>Map relating sequence IDs to timestamps</returns>
      62              : 
      63           79 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetRequests() const
      64              : {
      65           79 :         std::lock_guard<std::mutex> lk(request_mutex_);
      66           79 :         std::map<artdaq::Fragment::sequence_id_t, Fragment::timestamp_t> out;
      67       300151 :         for (auto& in : requests_)
      68              :         {
      69       300072 :                 out[in.first] = in.second;
      70              :         }
      71          158 :         return out;
      72           79 : }
      73              : 
      74            0 : std::pair<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetNextRequest()
      75              : {
      76            0 :         std::lock_guard<std::mutex> lk(request_mutex_);
      77              : 
      78            0 :         auto it = requests_.begin();
      79            0 :         while (it != requests_.end() && it->first <= last_next_request_) { ++it; }
      80              : 
      81            0 :         if (it == requests_.end())
      82              :         {
      83            0 :                 return std::make_pair<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t>(0, 0);
      84              :         }
      85              : 
      86            0 :         last_next_request_ = it->first;
      87            0 :         return *it;
      88            0 : }
      89              : 
      90       300076 : void artdaq::RequestBuffer::RemoveRequest(artdaq::Fragment::sequence_id_t reqID)
      91              : {
      92       600152 :         TLOG(TLVL_DEBUG + 35) << "RemoveRequest: Removing request for id " << reqID;
      93       300076 :         std::lock_guard<std::mutex> lk(request_mutex_);
      94       300076 :         requests_.erase(reqID);
      95              : 
      96       300076 :         if (reqID > highest_seen_request_)
      97              :         {
      98       600136 :                 TLOG(TLVL_DEBUG + 35) << "RemoveRequest: out_of_order_requests_.size() == " << out_of_order_requests_.size() << ", reqID=" << reqID << ", expected=" << highest_seen_request_ + request_increment_;
      99       300068 :                 if (out_of_order_requests_.size() || reqID != highest_seen_request_ + request_increment_)
     100              :                 {
     101           32 :                         out_of_order_requests_.insert(reqID);
     102              : 
     103           32 :                         auto it = out_of_order_requests_.begin();
     104           46 :                         while (it != out_of_order_requests_.end())  // Stop accounting for requests after stop
     105              :                         {
     106           40 :                                 if (*it == highest_seen_request_ + request_increment_)
     107              :                                 {
     108           14 :                                         highest_seen_request_ = *it;
     109           14 :                                         it = out_of_order_requests_.erase(it);
     110              :                                 }
     111              :                                 else
     112              :                                 {
     113           26 :                                         break;
     114              :                                 }
     115              :                         }
     116              :                 }
     117              :                 else  // no out-of-order requests and this request is highest seen + request_increment_
     118              :                 {
     119       300036 :                         highest_seen_request_ = reqID;
     120              :                 }
     121       600136 :                 TLOG(TLVL_DEBUG + 35) << "RemoveRequest: reqID=" << reqID << " Setting highest_seen_request_ to " << highest_seen_request_;
     122              :         }
     123       300076 :         if (metricMan && request_timing_.count(reqID))
     124              :         {
     125      2100406 :                 metricMan->sendMetric("Request Response Time", TimeUtils::GetElapsedTime(request_timing_[reqID]), "seconds", 2, MetricMode::Average);
     126              :         }
     127       300076 :         request_timing_.erase(reqID);
     128       300076 : }
     129              : 
     130              : /// <summary>
     131              : /// Clear all requests from the map
     132              : /// </summary>
     133              : 
     134            0 : void artdaq::RequestBuffer::ClearRequests()
     135              : {
     136            0 :         std::lock_guard<std::mutex> lk(request_mutex_);
     137            0 :         requests_.clear();
     138            0 : }
     139              : 
     140              : /// <summary>
     141              : /// Get the current requests, then clear the map
     142              : /// </summary>
     143              : /// <returns>Map relating sequence IDs to timestamps</returns>
     144              : 
     145            0 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetAndClearRequests()
     146              : {
     147            0 :         std::lock_guard<std::mutex> lk(request_mutex_);
     148            0 :         std::map<artdaq::Fragment::sequence_id_t, Fragment::timestamp_t> out;
     149            0 :         for (auto& in : requests_)
     150              :         {
     151            0 :                 out[in.first] = in.second;
     152              :         }
     153            0 :         if (requests_.size()) { highest_seen_request_ = requests_.rbegin()->first; }
     154            0 :         out_of_order_requests_.clear();
     155            0 :         requests_.clear();
     156            0 :         request_timing_.clear();
     157            0 :         return out;
     158            0 : }
     159              : 
     160              : /// <summary>
     161              : /// Get the number of requests currently stored in the RequestReceiver
     162              : /// </summary>
     163              : /// <returns>The number of requests stored in the RequestReceiver</returns>
     164              : 
     165          179 : size_t artdaq::RequestBuffer::size()
     166              : {
     167          179 :         std::lock_guard<std::mutex> tlk(request_mutex_);
     168          358 :         return requests_.size();
     169          179 : }
     170              : 
     171              : /// <summary>
     172              : /// Wait for a new request message, up to the timeout given
     173              : /// </summary>
     174              : /// <param name="timeout_ms">Milliseconds to wait for a new request to arrive</param>
     175              : /// <returns>True if any requests are present in the request map</returns>
     176              : 
     177          100 : bool artdaq::RequestBuffer::WaitForRequests(int timeout_ms)
     178              : {
     179          100 :         std::unique_lock<std::mutex> lk(request_mutex_);  // Lock needed by wait_for
     180              :         // See if we have to wait at all
     181          100 :         if (requests_.size() > 0) return true;
     182              :         // If we do have to wait, check requests_.size to make sure we're not being notified spuriously
     183          300 :         return request_cv_.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this]() { return requests_.size() > 0; });
     184          100 : }
     185              : 
     186              : /// <summary>
     187              : /// Get the time a given request was received
     188              : /// </summary>
     189              : /// <param name="reqID">Request ID of the request</param>
     190              : /// <returns>steady_clock::time_point corresponding to when the request was received</returns>
     191              : 
     192           30 : std::chrono::steady_clock::time_point artdaq::RequestBuffer::GetRequestTime(artdaq::Fragment::sequence_id_t reqID)
     193              : {
     194           30 :         std::lock_guard<std::mutex> lk(request_mutex_);
     195           60 :         return request_timing_.count(reqID) ? request_timing_[reqID] : std::chrono::steady_clock::now();
     196           30 : }
        

Generated by: LCOV version 2.0-1