LCOV - code coverage report
Current view: top level - artdaq/DAQrate - FragmentBuffer.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 80.6 % 582 469
Test Date: 2025-09-04 00:45:34 Functions: 77.0 % 87 67

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME (app_name + "_FragmentBuffer").c_str()  // include these 2 first -
       3              : 
       4              : #include "artdaq/DAQrate/FragmentBuffer.hh"
       5              : 
       6              : #include <boost/exception/all.hpp>
       7              : #include <boost/throw_exception.hpp>
       8              : 
       9              : #include <iterator>
      10              : #include <limits>
      11              : 
      12              : #include "canvas/Utilities/Exception.h"
      13              : #include "cetlib_except/exception.h"
      14              : #include "fhiclcpp/ParameterSet.h"
      15              : 
      16              : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
      17              : #include "artdaq-core/Data/Fragment.hh"
      18              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
      19              : #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
      20              : #include "artdaq-core/Utilities/TimeUtils.hh"
      21              : 
      22              : #include <sys/poll.h>
      23              : #include <algorithm>
      24              : #include <fstream>
      25              : #include <iomanip>
      26              : #include <iostream>
      27              : #include <iterator>
      28              : #include "artdaq/DAQdata/TCPConnect.hh"
      29              : 
      30              : #define TLVL_ADDFRAGMENT 32
      31              : #define TLVL_CHECKSTOP 33
      32              : #define TLVL_WAITFORBUFFERREADY 34
      33              : #define TLVL_GETBUFFERSTATS 35
      34              : #define TLVL_CHECKDATABUFFER 36
      35              : #define TLVL_APPLYREQUESTS 37
      36              : #define TLVL_APPLYREQUESTS_VERBOSE 38
      37              : #define TLVL_SENDEMPTYFRAGMENTS 39
      38              : #define TLVL_CHECKWINDOWS 40
      39              : #define TLVL_EMPTYFRAGMENT 41
      40              : 
      41           28 : artdaq::FragmentBuffer::FragmentBuffer(const fhicl::ParameterSet& ps)
      42           28 :     : next_sequence_id_(1)
      43           28 :     , requestBuffer_()
      44           56 :     , bufferModeKeepLatest_(ps.get<bool>("buffer_mode_keep_latest", false))
      45           56 :     , windowOffset_(ps.get<Fragment::timestamp_t>("request_window_offset", 0))
      46           56 :     , windowWidth_(ps.get<Fragment::timestamp_t>("request_window_width", 0))
      47           56 :     , staleTimeout_(ps.get<Fragment::timestamp_t>("stale_fragment_timeout", 0))
      48           56 :     , expectedType_(ps.get<Fragment::type_t>("expected_fragment_type", Fragment::type_t(Fragment::EmptyFragmentType)))
      49           56 :     , uniqueWindows_(ps.get<bool>("request_windows_are_unique", true))
      50           56 :     , sendMissingFragments_(ps.get<bool>("send_missing_request_fragments", true))
      51           56 :     , missing_request_window_timeout_us_(ps.get<size_t>("missing_request_window_timeout_us", 5000000))
      52           56 :     , window_close_timeout_us_(ps.get<size_t>("window_close_timeout_us", 2000000))
      53           56 :     , error_on_empty_(ps.get<bool>("error_on_empty_fragment", false))
      54           56 :     , circularDataBufferMode_(ps.get<bool>("circular_buffer_mode", false))
      55           56 :     , maxDataBufferDepthFragments_(ps.get<int>("data_buffer_depth_fragments", 1000))
      56           56 :     , maxDataBufferDepthBytes_(ps.get<size_t>("data_buffer_depth_mb", 1000) * 1024 * 1024)
      57           28 :     , systemFragmentCount_(0)
      58          112 :     , should_stop_(false)
      59              : {
      60           84 :         auto fragment_ids = ps.get<std::vector<artdaq::Fragment::fragment_id_t>>("fragment_ids", std::vector<artdaq::Fragment::fragment_id_t>());
      61              : 
      62           56 :         TLOG(TLVL_DEBUG + 33) << "artdaq::FragmentBuffer::FragmentBuffer(ps)";
      63           56 :         int fragment_id = ps.get<int>("fragment_id", -99);
      64              : 
      65           28 :         if (fragment_id != -99)
      66              :         {
      67           22 :                 if (fragment_ids.size() != 0)
      68              :                 {
      69            1 :                         auto report = "Error in FragmentBuffer: can't both define \"fragment_id\" and \"fragment_ids\" in FHiCL document";
      70            3 :                         TLOG(TLVL_ERROR) << report;
      71            3 :                         throw cet::exception("FragmentBufferConfig") << report;
      72              :                 }
      73              :                 else
      74              :                 {
      75           21 :                         fragment_ids.emplace_back(fragment_id);
      76              :                 }
      77              :         }
      78              : 
      79           66 :         for (auto& id : fragment_ids)
      80              :         {
      81           39 :                 dataBuffers_[id] = std::make_shared<DataBuffer>();
      82           39 :                 dataBuffers_[id]->DataBufferDepthBytes = 0;
      83           39 :                 dataBuffers_[id]->DataBufferDepthFragments = 0;
      84           39 :                 dataBuffers_[id]->HighestRequestSeen = 0;
      85           39 :                 dataBuffers_[id]->BufferFragmentKept = false;
      86              :         }
      87              : 
      88           81 :         std::string modeString = ps.get<std::string>("request_mode", "ignored");
      89           27 :         if (modeString == "single" || modeString == "Single")
      90              :         {
      91            3 :                 mode_ = RequestMode::Single;
      92              :         }
      93           24 :         else if (modeString.find("buffer") != std::string::npos || modeString.find("Buffer") != std::string::npos)
      94              :         {
      95            5 :                 mode_ = RequestMode::Buffer;
      96              :         }
      97           19 :         else if (modeString == "window" || modeString == "Window")
      98              :         {
      99           14 :                 mode_ = RequestMode::Window;
     100              :         }
     101            5 :         else if (modeString.find("ignore") != std::string::npos || modeString.find("Ignore") != std::string::npos)
     102              :         {
     103            3 :                 mode_ = RequestMode::Ignored;
     104              :         }
     105            2 :         else if (modeString.find("sequence") != std::string::npos || modeString.find("Sequence") != std::string::npos)
     106              :         {
     107            2 :                 mode_ = RequestMode::SequenceID;
     108              :         }
     109           75 :         if (mode_ != RequestMode::Ignored && !ps.get<bool>("receive_requests", false))
     110              :         {
     111            3 :                 TLOG(TLVL_WARNING) << "Request Mode was requested as " << modeString << ", but is being set to Ignored because \"receive_requests\" was not set to true";
     112            1 :                 mode_ = RequestMode::Ignored;
     113              :         }
     114           54 :         TLOG(TLVL_DEBUG + 32) << "Request mode is " << printMode_();
     115           32 : }
     116              : 
     117           27 : artdaq::FragmentBuffer::~FragmentBuffer()
     118              : {
     119           81 :         TLOG(TLVL_INFO) << "Fragment Buffer Destructor; Clearing data buffers";
     120           27 :         Reset(true);
     121           27 : }
     122              : 
     123     99490266 : void artdaq::FragmentBuffer::Reset(bool stop)
     124              : {
     125     99490266 :         should_stop_ = stop;
     126     99490266 :         next_sequence_id_ = 1;
     127    198980544 :         for (auto& id : dataBuffers_)
     128              :         {
     129     99490278 :                 std::lock_guard<std::mutex> dlk(id.second->DataBufferMutex);
     130     99490278 :                 id.second->DataBufferDepthBytes = 0;
     131     99490278 :                 id.second->DataBufferDepthFragments = 0;
     132     99490278 :                 id.second->BufferFragmentKept = false;
     133     99490278 :                 id.second->DataBuffer.clear();
     134     99490278 :         }
     135              : 
     136              :         {
     137     99490266 :                 std::lock_guard<std::mutex> lk(systemFragmentMutex_);
     138     99490266 :                 systemFragments_.clear();
     139     99490266 :                 systemFragmentCount_ = 0;
     140     99490266 :         }
     141     99490266 : }
     142              : 
     143       300051 : void artdaq::FragmentBuffer::AddFragmentsToBuffer(FragmentPtrs frags)
     144              : {
     145       300051 :         std::unordered_map<Fragment::fragment_id_t, FragmentPtrs> frags_by_id;
     146       800264 :         while (!frags.empty())
     147              :         {
     148       500213 :                 auto dataIter = frags.begin();
     149       500213 :                 auto frag_id = (*dataIter)->fragmentID();
     150              : 
     151       500213 :                 if (Fragment::isBroadcastFragmentType((*dataIter)->type()))
     152              :                 {
     153            0 :                         std::lock_guard<std::mutex> lk(systemFragmentMutex_);
     154            0 :                         systemFragments_.emplace_back(std::move(*dataIter));
     155            0 :                         systemFragmentCount_++;
     156            0 :                         frags.erase(dataIter);
     157            0 :                         continue;
     158            0 :                 }
     159              : 
     160       500213 :                 if (!dataBuffers_.count(frag_id))
     161              :                 {
     162            0 :                         throw cet::exception("FragmentIDs") << "Received Fragment with Fragment ID " << frag_id << ", which is not in the declared Fragment IDs list!";
     163              :                 }
     164              : 
     165       500213 :                 frags_by_id[frag_id].emplace_back(std::move(*dataIter));
     166       500213 :                 frags.erase(dataIter);
     167              :         }
     168              : 
     169       300051 :         auto type_it = frags_by_id.begin();
     170       600133 :         while (type_it != frags_by_id.end())
     171              :         {
     172       300082 :                 auto frag_id = type_it->first;
     173              : 
     174       300082 :                 waitForDataBufferReady(frag_id);
     175       300082 :                 auto dataBuffer = dataBuffers_[frag_id];
     176       300082 :                 std::lock_guard<std::mutex> dlk(dataBuffer->DataBufferMutex);
     177       300082 :                 switch (mode_)
     178              :                 {
     179           14 :                         case RequestMode::Single: {
     180           14 :                                 auto dataIter = type_it->second.rbegin();
     181           28 :                                 TLOG(TLVL_ADDFRAGMENT) << "Adding Fragment with Fragment ID " << frag_id << ", Sequence ID " << (*dataIter)->sequenceID() << ", and Timestamp " << (*dataIter)->timestamp() << " to buffer";
     182           14 :                                 dataBuffer->DataBuffer.clear();
     183           14 :                                 dataBuffer->DataBufferDepthBytes = (*dataIter)->sizeBytes();
     184           14 :                                 dataBuffer->DataBuffer.emplace_back(std::move(*dataIter));
     185           14 :                                 dataBuffer->DataBufferDepthFragments = 1;
     186           14 :                                 type_it->second.clear();
     187              :                         }
     188           14 :                         break;
     189       300068 :                         case RequestMode::Buffer:
     190              :                         case RequestMode::Ignored:
     191              :                         case RequestMode::Window:
     192              :                         case RequestMode::SequenceID:
     193              :                         default:
     194       800262 :                                 while (!type_it->second.empty())
     195              :                                 {
     196       500194 :                                         auto dataIter = type_it->second.begin();
     197      1000388 :                                         TLOG(TLVL_ADDFRAGMENT) << "Adding Fragment with Fragment ID " << frag_id << ", Sequence ID " << (*dataIter)->sequenceID() << ", and Timestamp " << (*dataIter)->timestamp() << " to buffer";
     198              : 
     199       500194 :                                         dataBuffer->DataBufferDepthBytes += (*dataIter)->sizeBytes();
     200       500194 :                                         dataBuffer->DataBuffer.emplace_back(std::move(*dataIter));
     201       500194 :                                         type_it->second.erase(dataIter);
     202              :                                 }
     203       300068 :                                 dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
     204       300068 :                                 break;
     205              :                 }
     206       300082 :                 getDataBufferStats(frag_id);
     207       300082 :                 ++type_it;
     208       300082 :         }
     209       300051 :         dataCondition_.notify_all();
     210       300051 : }
     211              : 
     212          364 : bool artdaq::FragmentBuffer::check_stop()
     213              : {
     214          728 :         TLOG(TLVL_CHECKSTOP) << "CFG::check_stop: should_stop=" << should_stop_.load();
     215              : 
     216          364 :         if (!should_stop_.load()) return false;
     217            4 :         if (mode_ == RequestMode::Ignored)
     218              :         {
     219            1 :                 return true;
     220              :         }
     221              : 
     222            3 :         if (requestBuffer_ != nullptr)
     223              :         {
     224              :                 // check_stop returns true if the CFG should stop. We should wait for the Request Buffer to report Request Receiver stopped before stopping.
     225            6 :                 TLOG(TLVL_DEBUG + 32) << "should_stop is true, requestBuffer_->isRunning() is " << std::boolalpha << requestBuffer_->isRunning();
     226            3 :                 if (!requestBuffer_->isRunning())
     227              :                 {
     228            1 :                         return true;
     229              :                 }
     230              :         }
     231            2 :         return false;
     232              : }
     233              : 
     234            0 : std::string artdaq::FragmentBuffer::printMode_()
     235              : {
     236            0 :         switch (mode_)
     237              :         {
     238            0 :                 case RequestMode::Single:
     239            0 :                         return "Single";
     240            0 :                 case RequestMode::Buffer:
     241            0 :                         return "Buffer";
     242            0 :                 case RequestMode::Window:
     243            0 :                         return "Window";
     244            0 :                 case RequestMode::Ignored:
     245            0 :                         return "Ignored";
     246            0 :                 case RequestMode::SequenceID:
     247            0 :                         return "SequenceID";
     248              :         }
     249              : 
     250            0 :         return "ERROR";
     251              : }
     252              : 
     253          302 : size_t artdaq::FragmentBuffer::dataBufferFragmentCount_()
     254              : {
     255          302 :         size_t count = 0;
     256          606 :         for (auto& id : dataBuffers_) count += id.second->DataBufferDepthFragments;
     257          302 :         count += systemFragmentCount_.load();
     258          302 :         return count;
     259              : }
     260              : 
     261       300082 : bool artdaq::FragmentBuffer::waitForDataBufferReady(Fragment::fragment_id_t id)
     262              : {
     263       300082 :         if (!dataBuffers_.count(id))
     264              :         {
     265            0 :                 TLOG(TLVL_ERROR) << "DataBufferError: "
     266            0 :                                  << "Error in FragmentBuffer: Cannot wait for data buffer for ID " << id << " because it does not exist!";
     267            0 :                 throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot wait for data buffer for ID " << id << " because it does not exist!";
     268              :         }
     269       300082 :         auto startwait = std::chrono::steady_clock::now();
     270       300082 :         auto first = true;
     271       300082 :         auto lastwaittime = 0ULL;
     272       300082 :         auto dataBuffer = dataBuffers_[id];
     273              : 
     274       319687 :         while (dataBufferIsTooLarge(id))
     275              :         {
     276        19605 :                 if (!circularDataBufferMode_)
     277              :                 {
     278        19605 :                         if (should_stop_.load())
     279              :                         {
     280            0 :                                 TLOG(TLVL_DEBUG + 32) << "Run ended while waiting for buffer to shrink!";
     281            0 :                                 getDataBufferStats(id);
     282            0 :                                 dataCondition_.notify_all();
     283            0 :                                 return false;
     284              :                         }
     285        19605 :                         auto waittime = TimeUtils::GetElapsedTimeMilliseconds(startwait);
     286              : 
     287        19605 :                         if (first || (waittime != lastwaittime && waittime % 1000 == 0))
     288              :                         {
     289        19605 :                                 std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
     290        19605 :                                 if (dataBufferIsTooLarge(id))
     291              :                                 {
     292        47748 :                                         TLOG(TLVL_WARNING) << "Bad Omen: Data Buffer has exceeded its size limits. "
     293        15916 :                                                            << "(seq_id=" << next_sequence_id_ << ", frag_id=" << id
     294        15916 :                                                            << ", frags=" << dataBuffer->DataBufferDepthFragments << "/" << maxDataBufferDepthFragments_
     295        15916 :                                                            << ", szB=" << dataBuffer->DataBufferDepthBytes << "/" << maxDataBufferDepthBytes_ << ")"
     296        31832 :                                                            << ", timestamps=" << dataBuffer->DataBuffer.front()->timestamp() << "-" << dataBuffer->DataBuffer.back()->timestamp();
     297        31832 :                                         TLOG(TLVL_DEBUG + 33) << "Bad Omen: Possible causes include requests not getting through or Ignored-mode BR issues";
     298              : 
     299        15916 :                                         if (metricMan)
     300              :                                         {
     301       111412 :                                                 metricMan->sendMetric("Bad Omen wait time", waittime / 1000.0, "s", 1, MetricMode::LastPoint);
     302              :                                         }
     303              :                                 }
     304        19605 :                                 first = false;
     305        19605 :                         }
     306        19605 :                         if (waittime % 5 && waittime != lastwaittime)
     307              :                         {
     308            0 :                                 TLOG(TLVL_WAITFORBUFFERREADY) << "getDataLoop: Data Retreival paused for " << waittime << " ms waiting for data buffer to drain";
     309              :                         }
     310        19605 :                         lastwaittime = waittime;
     311        19605 :                         usleep(1000);
     312              :                 }
     313              :                 else
     314              :                 {
     315            0 :                         std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
     316            0 :                         if (dataBufferIsTooLarge(id))
     317              :                         {
     318            0 :                                 auto begin = dataBuffer->DataBuffer.begin();
     319            0 :                                 if (begin == dataBuffer->DataBuffer.end())
     320              :                                 {
     321            0 :                                         TLOG(TLVL_WARNING) << "Data buffer is reported as too large, but doesn't contain any Fragments! Possible corrupt memory!";
     322            0 :                                         continue;
     323            0 :                                 }
     324            0 :                                 if (*begin)
     325              :                                 {
     326            0 :                                         TLOG(TLVL_WAITFORBUFFERREADY) << "waitForDataBufferReady: Dropping Fragment with timestamp " << (*begin)->timestamp() << " from data buffer (Buffer over-size, circular data buffer mode)";
     327              : 
     328            0 :                                         dataBuffer->DataBufferDepthBytes -= (*begin)->sizeBytes();
     329            0 :                                         dataBuffer->DataBuffer.erase(begin);
     330            0 :                                         dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
     331            0 :                                         dataBuffer->BufferFragmentKept = false;  // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
     332              :                                 }
     333              :                         }
     334            0 :                 }
     335              :         }
     336       300082 :         return true;
     337       300082 : }
     338              : 
     339       389444 : bool artdaq::FragmentBuffer::dataBufferIsTooLarge(Fragment::fragment_id_t id)
     340              : {
     341       389444 :         if (!dataBuffers_.count(id))
     342              :         {
     343            0 :                 TLOG(TLVL_ERROR) << "DataBufferError: "
     344            0 :                                  << "Error in FragmentBuffer: Cannot check size of data buffer for ID " << id << " because it does not exist!";
     345            0 :                 throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot check size of data buffer for ID " << id << " because it does not exist!";
     346              :         }
     347       389444 :         auto dataBuffer = dataBuffers_[id];
     348      1082746 :         return (maxDataBufferDepthFragments_ > 0 && dataBuffer->DataBufferDepthFragments.load() > maxDataBufferDepthFragments_) ||
     349       997160 :                (maxDataBufferDepthBytes_ > 0 && dataBuffer->DataBufferDepthBytes.load() > maxDataBufferDepthBytes_);
     350       389443 : }
     351              : 
     352       300220 : void artdaq::FragmentBuffer::getDataBufferStats(Fragment::fragment_id_t id)
     353              : {
     354       300220 :         if (!dataBuffers_.count(id))
     355              :         {
     356            0 :                 TLOG(TLVL_ERROR) << "DataBufferError: "
     357            0 :                                  << "Error in FragmentBuffer: Cannot get stats of data buffer for ID " << id << " because it does not exist!";
     358            0 :                 throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot get stats of data buffer for ID " << id << " because it does not exist!";
     359              :         }
     360       300220 :         auto dataBuffer = dataBuffers_[id];
     361              : 
     362       300220 :         if (metricMan)
     363              :         {
     364       600440 :                 TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: Sending Metrics";
     365      2101540 :                 metricMan->sendMetric("Buffer Depth Fragments", dataBuffer->DataBufferDepthFragments.load(), "fragments", 1, MetricMode::LastPoint);
     366      2101540 :                 metricMan->sendMetric("Buffer Depth Bytes", dataBuffer->DataBufferDepthBytes.load(), "bytes", 1, MetricMode::LastPoint);
     367              : 
     368       300220 :                 auto bufferDepthFragmentsPercent = dataBuffer->DataBufferDepthFragments.load() * 100 / static_cast<double>(maxDataBufferDepthFragments_);
     369       300220 :                 auto bufferDepthBytesPercent = dataBuffer->DataBufferDepthBytes.load() * 100 / static_cast<double>(maxDataBufferDepthBytes_);
     370      1801320 :                 metricMan->sendMetric("Fragment Buffer Full %Fragments", bufferDepthFragmentsPercent, "%", 3, MetricMode::LastPoint);
     371      1801318 :                 metricMan->sendMetric("Fragment Buffer Full %Bytes", bufferDepthBytesPercent, "%", 3, MetricMode::LastPoint);
     372      2101535 :                 metricMan->sendMetric("Fragment Buffer Full %", bufferDepthFragmentsPercent > bufferDepthBytesPercent ? bufferDepthFragmentsPercent : bufferDepthBytesPercent, "%", 1, MetricMode::LastPoint);
     373              :         }
     374       600440 :         TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: frags=" << dataBuffer->DataBufferDepthFragments.load() << "/" << maxDataBufferDepthFragments_
     375       300220 :                                   << ", sz=" << dataBuffer->DataBufferDepthBytes.load() << "/" << maxDataBufferDepthBytes_;
     376       300220 : }
     377              : 
     378              : //-----------------------------------------------------------------------------
     379              : // P.Murat: return stat reports as a string
     380              : //-----------------------------------------------------------------------------
     381            0 : std::string artdaq::FragmentBuffer::getStatReport()
     382              : {
     383            0 :         std::ostringstream oss;
     384              : 
     385            0 :         for (auto& it : dataBuffers_)
     386              :         {
     387            0 :                 Fragment::fragment_id_t id = it.first;
     388            0 :                 if (!dataBuffers_.count(id))
     389              :                 {
     390            0 :                         TLOG(TLVL_ERROR) << "DataBufferError: "
     391            0 :                                          << "Error in FragmentBuffer: Cannot get stats of data buffer for ID "
     392            0 :                                          << id << " because it does not exist!";
     393            0 :                         throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot get stats of data buffer for ID "
     394            0 :                                                                 << id << " because it does not exist!";
     395              :                 }
     396              : 
     397            0 :                 auto dataBuffer = dataBuffers_[id];
     398              : 
     399            0 :                 TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: Sending Metrics";
     400              : 
     401            0 :                 int nf = dataBuffer->DataBufferDepthFragments.load();
     402            0 :                 int nb = dataBuffer->DataBufferDepthBytes.load();
     403            0 :                 oss << std::endl
     404            0 :                     << "fragment_id:" << id << " nfragments:" << nf << " nbytes:" << nb
     405            0 :                     << " max_nf:" << maxDataBufferDepthFragments_ << " max_nb:" << maxDataBufferDepthBytes_;
     406              : 
     407            0 :                 TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: frags=" << dataBuffer->DataBufferDepthFragments.load() << "/" << maxDataBufferDepthFragments_
     408            0 :                                           << ", sz=" << dataBuffer->DataBufferDepthBytes.load() << "/" << maxDataBufferDepthBytes_;
     409            0 :         }
     410              : 
     411            0 :         return oss.str();
     412            0 : }
     413              : 
     414          231 : void artdaq::FragmentBuffer::checkDataBuffer(Fragment::fragment_id_t id)
     415              : {
     416          231 :         if (!dataBuffers_.count(id))
     417              :         {
     418            0 :                 TLOG(TLVL_ERROR) << "DataBufferError: "
     419            0 :                                  << "Error in FragmentBuffer: Cannot check data buffer for ID " << id << " because it does not exist!";
     420            0 :                 throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot check data buffer for ID " << id << " because it does not exist!";
     421              :         }
     422              : 
     423          231 :         if (dataBuffers_[id]->DataBufferDepthFragments > 0 && mode_ != RequestMode::Single && mode_ != RequestMode::Ignored)
     424              :         {
     425           88 :                 auto dataBuffer = dataBuffers_[id];
     426           88 :                 std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
     427              : 
     428              :                 // Eliminate extra fragments
     429        50152 :                 while (dataBufferIsTooLarge(id))
     430              :                 {
     431        50064 :                         auto begin = dataBuffer->DataBuffer.begin();
     432       100128 :                         TLOG(TLVL_CHECKDATABUFFER) << "checkDataBuffer: Dropping Fragment with timestamp " << (*begin)->timestamp() << " from data buffer (Buffer over-size)";
     433        50064 :                         dataBuffer->DataBufferDepthBytes -= (*begin)->sizeBytes();
     434        50064 :                         dataBuffer->DataBuffer.erase(begin);
     435        50064 :                         dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
     436        50064 :                         dataBuffer->BufferFragmentKept = false;  // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
     437              :                 }
     438              : 
     439          176 :                 TLOG(TLVL_CHECKDATABUFFER) << "DataBufferDepthFragments is " << dataBuffer->DataBufferDepthFragments << ", DataBuffer.size is " << dataBuffer->DataBuffer.size();
     440           88 :                 if (dataBuffer->DataBufferDepthFragments > 0 && staleTimeout_ > 0)
     441              :                 {
     442            0 :                         TLOG(TLVL_CHECKDATABUFFER) << "Determining if Fragments can be dropped from data buffer";
     443            0 :                         Fragment::timestamp_t last = dataBuffer->DataBuffer.back()->timestamp();
     444            0 :                         Fragment::timestamp_t min = last > staleTimeout_ ? last - staleTimeout_ : 0;
     445            0 :                         for (auto it = dataBuffer->DataBuffer.begin(); it != dataBuffer->DataBuffer.end();)
     446              :                         {
     447            0 :                                 if ((*it)->timestamp() < min)
     448              :                                 {
     449            0 :                                         TLOG(TLVL_CHECKDATABUFFER) << "checkDataBuffer: Dropping Fragment with timestamp " << (*it)->timestamp() << " from data buffer (timeout=" << staleTimeout_ << ", min=" << min << ")";
     450            0 :                                         dataBuffer->DataBufferDepthBytes -= (*it)->sizeBytes();
     451            0 :                                         dataBuffer->BufferFragmentKept = false;  // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
     452            0 :                                         it = dataBuffer->DataBuffer.erase(it);
     453            0 :                                         dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
     454              :                                 }
     455              :                                 else
     456              :                                 {
     457            0 :                                         break;
     458              :                                 }
     459              :                         }
     460              :                 }
     461           88 :         }
     462          231 : }
     463              : 
     464            5 : void artdaq::FragmentBuffer::applyRequestsIgnoredMode(artdaq::FragmentPtrs& frags)
     465              : {
     466              :         // dataBuffersMutex_ is held by calling function
     467              :         // We just copy everything that's here into the output.
     468           10 :         TLOG(TLVL_APPLYREQUESTS) << "Mode is Ignored; Copying data to output";
     469           12 :         for (auto& id : dataBuffers_)
     470              :         {
     471            7 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     472            7 :                 if (id.second && !id.second->DataBuffer.empty() && id.second->DataBuffer.back()->sequenceID() >= next_sequence_id_)
     473              :                 {
     474            4 :                         next_sequence_id_ = id.second->DataBuffer.back()->sequenceID() + 1;
     475              :                 }
     476            7 :                 std::move(id.second->DataBuffer.begin(), id.second->DataBuffer.end(), std::inserter(frags, frags.end()));
     477            7 :                 id.second->DataBufferDepthBytes = 0;
     478            7 :                 id.second->DataBufferDepthFragments = 0;
     479            7 :                 id.second->BufferFragmentKept = false;
     480            7 :                 id.second->DataBuffer.clear();
     481            7 :         }
     482            5 : }
     483              : 
     484           11 : void artdaq::FragmentBuffer::applyRequestsSingleMode(artdaq::FragmentPtrs& frags)
     485              : {
     486              :         // We only care about the latest request received. Send empties for all others.
     487           11 :         auto requests = requestBuffer_->GetRequests();
     488           11 :         while (requests.size() > 1)
     489              :         {
     490              :                 // std::map is ordered by key => Last sequence ID in the map is the one we care about
     491            0 :                 requestBuffer_->RemoveRequest(requests.begin()->first);
     492            0 :                 requests.erase(requests.begin());
     493              :         }
     494           11 :         sendEmptyFragments(frags, requests);
     495              : 
     496              :         // If no requests remain after sendEmptyFragments, return
     497           11 :         if (requests.size() == 0 || !requests.count(next_sequence_id_)) return;
     498              : 
     499           28 :         for (auto& id : dataBuffers_)
     500              :         {
     501           18 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     502           18 :                 if (id.second->DataBufferDepthFragments > 0)
     503              :                 {
     504           18 :                         assert(id.second->DataBufferDepthFragments == 1);
     505           36 :                         TLOG(TLVL_APPLYREQUESTS) << "Mode is Single; Sending copy of last event (SeqID " << next_sequence_id_ << ")";
     506           36 :                         for (auto& fragptr : id.second->DataBuffer)
     507              :                         {
     508              :                                 // Return the latest data point
     509           18 :                                 auto frag = fragptr.get();
     510           18 :                                 auto newfrag = std::unique_ptr<artdaq::Fragment>(new Fragment(next_sequence_id_, frag->fragmentID()));
     511           18 :                                 newfrag->resize(frag->size() - detail::RawFragmentHeader::num_words());
     512           18 :                                 memcpy(newfrag->headerAddress(), frag->headerAddress(), frag->sizeBytes());
     513           18 :                                 newfrag->setTimestamp(requests[next_sequence_id_]);
     514           18 :                                 newfrag->setSequenceID(next_sequence_id_);
     515           18 :                                 frags.push_back(std::move(newfrag));
     516           18 :                         }
     517              :                 }
     518              :                 else
     519              :                 {
     520            0 :                         sendEmptyFragment(frags, next_sequence_id_, id.first, "No data for");
     521              :                 }
     522           18 :         }
     523           10 :         requestBuffer_->RemoveRequest(next_sequence_id_);
     524           10 :         ++next_sequence_id_;
     525           11 : }
     526              : 
     527           17 : void artdaq::FragmentBuffer::applyRequestsBufferMode(artdaq::FragmentPtrs& frags)
     528              : {
     529              :         // We only care about the latest request received. Send empties for all others.
     530           17 :         auto requests = requestBuffer_->GetRequests();
     531           17 :         while (requests.size() > 1)
     532              :         {
     533              :                 // std::map is ordered by key => Last sequence ID in the map is the one we care about
     534            0 :                 requestBuffer_->RemoveRequest(requests.begin()->first);
     535            0 :                 requests.erase(requests.begin());
     536              :         }
     537           17 :         sendEmptyFragments(frags, requests);
     538              : 
     539              :         // If no requests remain after sendEmptyFragments, return
     540           17 :         if (requests.size() == 0 || !requests.count(next_sequence_id_)) return;
     541              : 
     542           48 :         for (auto& id : dataBuffers_)
     543              :         {
     544           62 :                 TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Creating ContainerFragment for Buffered Fragments (SeqID " << next_sequence_id_ << ")";
     545           31 :                 frags.emplace_back(new artdaq::Fragment(next_sequence_id_, id.first));
     546           31 :                 frags.back()->setTimestamp(requests[next_sequence_id_]);
     547           31 :                 ContainerFragmentLoader cfl(*frags.back());
     548           31 :                 cfl.set_missing_data(false);  // Buffer mode is never missing data, even if there IS no data.
     549              : 
     550              :                 // If we kept a Fragment from the previous iteration, but more data has arrived, discard it
     551           31 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     552           31 :                 if (id.second->BufferFragmentKept && id.second->DataBufferDepthFragments > 1)
     553              :                 {
     554            1 :                         id.second->DataBufferDepthBytes -= id.second->DataBuffer.front()->sizeBytes();
     555            1 :                         id.second->DataBuffer.erase(id.second->DataBuffer.begin());
     556            1 :                         id.second->DataBufferDepthFragments = id.second->DataBuffer.size();
     557              :                 }
     558              : 
     559              :                 // Buffer mode TFGs should simply copy out the whole dataBuffer_ into a ContainerFragment
     560           31 :                 FragmentPtrs fragsToAdd;
     561           31 :                 std::move(id.second->DataBuffer.begin(), --id.second->DataBuffer.end(), std::back_inserter(fragsToAdd));
     562           31 :                 id.second->DataBuffer.erase(id.second->DataBuffer.begin(), --id.second->DataBuffer.end());
     563              : 
     564           31 :                 if (fragsToAdd.size() > 0)
     565              :                 {
     566           26 :                         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Adding " << fragsToAdd.size() << " Fragments to Container (SeqID " << next_sequence_id_ << ")";
     567           13 :                         cfl.addFragments(fragsToAdd);
     568              :                 }
     569              :                 else
     570              :                 {
     571           36 :                         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: No Fragments to add (SeqID " << next_sequence_id_ << ")";
     572              :                 }
     573              : 
     574           31 :                 if (id.second->DataBuffer.size() == 1)
     575              :                 {
     576           46 :                         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Adding Fragment with timestamp " << id.second->DataBuffer.front()->timestamp() << " to Container with sequence ID " << next_sequence_id_;
     577           23 :                         cfl.addFragment(id.second->DataBuffer.front());
     578           23 :                         if (bufferModeKeepLatest_)
     579              :                         {
     580            3 :                                 id.second->BufferFragmentKept = true;
     581            3 :                                 id.second->DataBufferDepthBytes = id.second->DataBuffer.front()->sizeBytes();
     582            3 :                                 id.second->DataBufferDepthFragments = id.second->DataBuffer.size();  // 1
     583              :                         }
     584              :                         else
     585              :                         {
     586           20 :                                 id.second->DataBuffer.clear();
     587           20 :                                 id.second->BufferFragmentKept = false;
     588           20 :                                 id.second->DataBufferDepthBytes = 0;
     589           20 :                                 id.second->DataBufferDepthFragments = 0;
     590              :                         }
     591              :                 }
     592           31 :         }
     593           17 :         requestBuffer_->RemoveRequest(next_sequence_id_);
     594           17 :         ++next_sequence_id_;
     595           17 : }
     596              : 
     597       300053 : void artdaq::FragmentBuffer::applyRequestsWindowMode_CheckAndFillDataBuffer(artdaq::FragmentPtrs& frags, artdaq::Fragment::fragment_id_t id, artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts)
     598              : {
     599       300053 :         auto dataBuffer = dataBuffers_[id];
     600              : 
     601       600106 :         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Checking that data exists for request window " << seq;
     602       300053 :         Fragment::timestamp_t min = ts > windowOffset_ ? ts - windowOffset_ : 0;
     603       300053 :         Fragment::timestamp_t max = ts + windowWidth_ > windowOffset_ ? ts + windowWidth_ - windowOffset_ : 1;
     604              : 
     605       600106 :         TLOG(TLVL_APPLYREQUESTS) << "ApplyRequestsWindowsMode_CheckAndFillDataBuffer: min is " << min << ", max is " << max
     606            0 :                                  << " and first/last points in buffer are " << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0)
     607            0 :                                  << "/" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0)
     608            0 :                                  << " (sz=" << dataBuffer->DataBufferDepthFragments << " [" << dataBuffer->DataBufferDepthBytes.load()
     609       300053 :                                  << "/" << maxDataBufferDepthBytes_ << "])";
     610       300053 :         bool windowClosed = dataBuffer->DataBufferDepthFragments > 0 && dataBuffer->DataBuffer.back()->timestamp() >= max;
     611       300053 :         bool windowTimeout = !windowClosed && TimeUtils::GetElapsedTimeMicroseconds(requestBuffer_->GetRequestTime(seq)) > window_close_timeout_us_;
     612       300053 :         if (windowTimeout)
     613              :         {
     614           21 :                 TLOG(TLVL_WARNING) << "applyRequestsWindowMode_CheckAndFillDataBuffer: A timeout occurred waiting for data to close the request window ({" << min << "-" << max
     615            7 :                                    << "}, buffer={" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0) << "-"
     616            7 :                                    << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0)
     617            7 :                                    << "} ). Time waiting: "
     618           14 :                                    << TimeUtils::GetElapsedTimeMicroseconds(requestBuffer_->GetRequestTime(seq)) << " us "
     619           14 :                                    << "(> " << window_close_timeout_us_ << " us).";
     620              :         }
     621       300053 :         if (windowClosed || windowTimeout)
     622              :         {
     623       600074 :                 TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Creating ContainerFragment for Window-requested Fragments (SeqID " << seq << ")";
     624       300037 :                 frags.emplace_back(new artdaq::Fragment(seq, id));
     625       300037 :                 frags.back()->setTimestamp(ts);
     626       300037 :                 ContainerFragmentLoader cfl(*frags.back());
     627              : 
     628              :                 // In the spirit of NOvA's MegaPool: (RS = Request start (min), RE = Request End (max))
     629              :                 //  --- | Buffer Start | --- | Buffer End | ---
     630              :                 // 1. RS RE |           |     |            |
     631              :                 // 2. RS |              |  RE |            |
     632              :                 // 3. RS |              |     |            | RE
     633              :                 // 4.    |              | RS RE |          |
     634              :                 // 5.    |              | RS  |            | RE
     635              :                 // 6.    |              |     |            | RS RE
     636              :                 //
     637              :                 // If RE (or RS) is after the end of the buffer, we wait for window_close_timeout_us_. If we're here, then that means that windowClosed is false, and the missing_data flag should be set.
     638              :                 // If RS (or RE) is before the start of the buffer, then missing_data should be set to true, as data is assumed to arrive in the buffer in timestamp order
     639              :                 // If the dataBuffer has size 0, then windowClosed will be false
     640       300037 :                 if (!windowClosed || (dataBuffer->DataBufferDepthFragments > 0 && dataBuffer->DataBuffer.front()->timestamp() > min))
     641              :                 {
     642       100028 :                         TLOG(TLVL_DEBUG + 32) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Request window starts before and/or ends after the current data buffer, setting ContainerFragment's missing_data flag!"
     643            0 :                                               << " (requestWindowRange=[" << min << "," << max << "], "
     644            0 :                                               << "buffer={" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0) << "-"
     645        50014 :                                               << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0) << "} (SeqID " << seq << ")";
     646        50014 :                         cfl.set_missing_data(true);
     647              :                 }
     648              : 
     649       300037 :                 auto it = dataBuffer->DataBuffer.begin();
     650              :                 // Likely that it will be closer to the end...
     651       300037 :                 if (windowTimeout)
     652              :                 {
     653            7 :                         it = dataBuffer->DataBuffer.end();
     654            7 :                         --it;
     655            8 :                         while (it != dataBuffer->DataBuffer.begin())
     656              :                         {
     657            2 :                                 if ((*it)->timestamp() < min)
     658              :                                 {
     659            1 :                                         break;
     660              :                                 }
     661            1 :                                 --it;
     662              :                         }
     663              :                 }
     664              : 
     665       300037 :                 FragmentPtrs fragsToAdd;
     666              :                 // Do a little bit more work to decide which fragments to send for a given request
     667       550109 :                 for (; it != dataBuffer->DataBuffer.end();)
     668              :                 {
     669       550088 :                         Fragment::timestamp_t fragT = (*it)->timestamp();
     670       550088 :                         if (fragT < min)
     671              :                         {
     672           34 :                                 ++it;
     673           34 :                                 continue;
     674              :                         }
     675       550054 :                         if (fragT > max || (fragT == max && windowWidth_ > 0))
     676              :                         {
     677              :                                 break;
     678              :                         }
     679              : 
     680       500076 :                         TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Adding Fragment with timestamp " << (*it)->timestamp() << " to Container (SeqID " << seq << ")";
     681       250038 :                         if (uniqueWindows_)
     682              :                         {
     683       250038 :                                 dataBuffer->DataBufferDepthBytes -= (*it)->sizeBytes();
     684       250038 :                                 fragsToAdd.emplace_back(std::move(*it));
     685       250038 :                                 it = dataBuffer->DataBuffer.erase(it);
     686              :                         }
     687              :                         else
     688              :                         {
     689            0 :                                 fragsToAdd.emplace_back(it->get());
     690            0 :                                 ++it;
     691              :                         }
     692              :                 }
     693              : 
     694       300037 :                 if (fragsToAdd.size() > 0)
     695              :                 {
     696       500054 :                         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Adding " << fragsToAdd.size() << " Fragments to Container (SeqID " << seq << ")";
     697       250027 :                         cfl.addFragments(fragsToAdd);
     698              : 
     699              :                         // Don't delete Fragments which are still in the Fragment buffer
     700       250027 :                         if (!uniqueWindows_)
     701              :                         {
     702            0 :                                 for (auto& frag : fragsToAdd)
     703              :                                 {
     704            0 :                                         frag.release();
     705              :                                 }
     706              :                         }
     707       250027 :                         fragsToAdd.clear();
     708              :                 }
     709              :                 else
     710              :                 {
     711       100020 :                         TLOG(error_on_empty_ ? TLVL_ERROR : TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: No Fragments match request (SeqID " << seq << ", window " << min << " - " << max << ")";
     712              :                 }
     713              : 
     714       300037 :                 dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
     715       300037 :                 dataBuffer->WindowsSent[seq] = std::chrono::steady_clock::now();
     716       300037 :                 if (seq > dataBuffer->HighestRequestSeen) dataBuffer->HighestRequestSeen = seq;
     717       300037 :         }
     718       300053 : }
     719              : 
     720           41 : void artdaq::FragmentBuffer::applyRequestsWindowMode(artdaq::FragmentPtrs& frags)
     721              : {
     722           82 :         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode BEGIN";
     723              : 
     724           41 :         auto requests = requestBuffer_->GetRequests();
     725              : 
     726           82 :         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode: Starting request processing for " << requests.size() << " requests";
     727       300076 :         for (auto req = requests.begin(); req != requests.end();)
     728              :         {
     729       600070 :                 TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode: processing request with sequence ID " << req->first << ", timestamp " << req->second;
     730              : 
     731       300035 :                 while (req->first < next_sequence_id_ && requests.size() > 0)
     732              :                 {
     733            0 :                         TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsWindowMode: Clearing passed request for sequence ID " << req->first;
     734            0 :                         requestBuffer_->RemoveRequest(req->first);
     735            0 :                         req = requests.erase(req);
     736              :                 }
     737       300035 :                 if (requests.size() == 0) break;
     738              : 
     739       300035 :                 auto ts = req->second;
     740       300035 :                 if (ts == Fragment::InvalidTimestamp)
     741              :                 {
     742            0 :                         TLOG(TLVL_ERROR) << "applyRequestsWindowMode: Received InvalidTimestamp in request " << req->first << ", cannot apply! Check that push-mode BRs are filling appropriate timestamps in their Fragments!";
     743            0 :                         req = requests.erase(req);
     744            0 :                         continue;
     745            0 :                 }
     746              : 
     747       600090 :                 for (auto& id : dataBuffers_)
     748              :                 {
     749       300055 :                         std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     750       300055 :                         if (!id.second->WindowsSent.count(req->first))
     751              :                         {
     752       300053 :                                 applyRequestsWindowMode_CheckAndFillDataBuffer(frags, id.first, req->first, req->second);
     753              :                         }
     754       300055 :                 }
     755       300035 :                 checkSentWindows(req->first);
     756       300035 :                 ++req;
     757              :         }
     758              : 
     759              :         // Check sent windows for requests that can be removed
     760           41 :         std::set<artdaq::Fragment::sequence_id_t> seqs;
     761          102 :         for (auto& id : dataBuffers_)
     762              :         {
     763           61 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     764           83 :                 for (auto& seq : id.second->WindowsSent)
     765              :                 {
     766           22 :                         seqs.insert(seq.first);
     767              :                 }
     768           61 :         }
     769           52 :         for (auto& seq : seqs)
     770              :         {
     771           11 :                 checkSentWindows(seq);
     772              :         }
     773           41 : }
     774              : 
     775           10 : void artdaq::FragmentBuffer::applyRequestsSequenceIDMode(artdaq::FragmentPtrs& frags)
     776              : {
     777           20 :         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode BEGIN";
     778              : 
     779           10 :         auto requests = requestBuffer_->GetRequests();
     780              : 
     781           20 :         TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode: Starting request processing";
     782           20 :         for (auto req = requests.begin(); req != requests.end();)
     783              :         {
     784           20 :                 TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode: Checking that data exists for request SequenceID " << req->first;
     785              : 
     786           30 :                 for (auto& id : dataBuffers_)
     787              :                 {
     788           20 :                         std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     789           20 :                         if (!id.second->WindowsSent.count(req->first))
     790              :                         {
     791           40 :                                 TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "Searching id " << id.first << " for Fragments with Sequence ID " << req->first;
     792           40 :                                 for (auto it = id.second->DataBuffer.begin(); it != id.second->DataBuffer.end();)
     793              :                                 {
     794           20 :                                         auto seq = (*it)->sequenceID();
     795           40 :                                         TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsSequenceIDMode: Fragment SeqID " << seq << ", request ID " << req->first;
     796           20 :                                         if (seq == req->first)
     797              :                                         {
     798           32 :                                                 TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsSequenceIDMode: Adding Fragment to output";
     799           16 :                                                 id.second->WindowsSent[req->first] = std::chrono::steady_clock::now();
     800           16 :                                                 id.second->DataBufferDepthBytes -= (*it)->sizeBytes();
     801           16 :                                                 frags.push_back(std::move(*it));
     802           16 :                                                 it = id.second->DataBuffer.erase(it);
     803           16 :                                                 id.second->DataBufferDepthFragments = id.second->DataBuffer.size();
     804              :                                         }
     805              :                                         else
     806              :                                         {
     807            4 :                                                 ++it;
     808              :                                         }
     809              :                                 }
     810              :                         }
     811           20 :                         if (req->first > id.second->HighestRequestSeen) id.second->HighestRequestSeen = req->first;
     812           20 :                 }
     813           10 :                 checkSentWindows(req->first);
     814           10 :                 ++req;
     815              :         }
     816              : 
     817              :         // Check sent windows for requests that can be removed
     818           10 :         std::set<artdaq::Fragment::sequence_id_t> seqs;
     819           30 :         for (auto& id : dataBuffers_)
     820              :         {
     821           20 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     822           28 :                 for (auto& seq : id.second->WindowsSent)
     823              :                 {
     824            8 :                         seqs.insert(seq.first);
     825              :                 }
     826           20 :         }
     827           14 :         for (auto& seq : seqs)
     828              :         {
     829            4 :                 checkSentWindows(seq);
     830              :         }
     831           10 : }
     832              : 
     833           86 : bool artdaq::FragmentBuffer::applyRequests(artdaq::FragmentPtrs& frags)
     834              : {
     835           86 :         if (check_stop())
     836              :         {
     837            2 :                 return false;
     838              :         }
     839              : 
     840              :         // Wait for data, if in ignored mode, or a request otherwise
     841           84 :         if (mode_ == RequestMode::Ignored)
     842              :         {
     843            5 :                 auto start_time = std::chrono::steady_clock::now();
     844          104 :                 while (dataBufferFragmentCount_() == 0 && TimeUtils::GetElapsedTime(start_time) < 1.0)
     845              :                 {
     846           99 :                         if (check_stop()) return false;
     847           99 :                         std::unique_lock<std::mutex> lock(dataConditionMutex_);
     848          297 :                         dataCondition_.wait_for(lock, std::chrono::milliseconds(10), [this]() { return dataBufferFragmentCount_() > 0; });
     849           99 :                 }
     850              :         }
     851           79 :         else if (requestBuffer_ == nullptr)
     852              :         {
     853            0 :                 TLOG(TLVL_ERROR) << "Request Buffer must be set (via SetRequestBuffer) before applyRequests/getData can be called!";
     854            0 :                 return false;
     855              :         }
     856              :         else
     857              :         {
     858           79 :                 if ((check_stop() && requestBuffer_->size() == 0)) return false;
     859              : 
     860           79 :                 std::unique_lock<std::mutex> lock(dataConditionMutex_);
     861           79 :                 dataCondition_.wait_for(lock, std::chrono::milliseconds(10));
     862              : 
     863           79 :                 checkDataBuffers();
     864              : 
     865              :                 // Wait up to 1000 ms for a request...
     866           79 :                 auto counter = 0;
     867              : 
     868          179 :                 while (requestBuffer_->size() == 0 && counter < 100)
     869              :                 {
     870          100 :                         if (check_stop()) return false;
     871              : 
     872          100 :                         checkDataBuffers();
     873              : 
     874          100 :                         requestBuffer_->WaitForRequests(10);  // milliseconds
     875          100 :                         counter++;
     876              :                 }
     877           79 :         }
     878              : 
     879          168 :         if (systemFragmentCount_.load() > 0)
     880              :         {
     881            0 :                 std::lock_guard<std::mutex> lk(systemFragmentMutex_);
     882            0 :                 TLOG(TLVL_INFO) << "Copying " << systemFragmentCount_.load() << " System Fragments into output";
     883              : 
     884            0 :                 std::move(systemFragments_.begin(), systemFragments_.end(), std::inserter(frags, frags.end()));
     885            0 :                 systemFragments_.clear();
     886            0 :                 systemFragmentCount_ = 0;
     887            0 :         }
     888              : 
     889           84 :         switch (mode_)
     890              :         {
     891           11 :                 case RequestMode::Single:
     892           11 :                         applyRequestsSingleMode(frags);
     893           11 :                         break;
     894           41 :                 case RequestMode::Window:
     895           41 :                         applyRequestsWindowMode(frags);
     896           41 :                         break;
     897           17 :                 case RequestMode::Buffer:
     898           17 :                         applyRequestsBufferMode(frags);
     899           17 :                         break;
     900           10 :                 case RequestMode::SequenceID:
     901           10 :                         applyRequestsSequenceIDMode(frags);
     902           10 :                         break;
     903            5 :                 case RequestMode::Ignored:
     904              :                 default:
     905            5 :                         applyRequestsIgnoredMode(frags);
     906            5 :                         break;
     907              :         }
     908              : 
     909           84 :         getDataBuffersStats();
     910              : 
     911           84 :         if (frags.size() > 0)
     912          138 :                 TLOG(TLVL_APPLYREQUESTS) << "Finished Processing requests, returning " << frags.size() << " fragments, current ev_counter is " << next_sequence_id_;
     913           84 :         return true;
     914              : }
     915              : 
     916           13 : bool artdaq::FragmentBuffer::sendEmptyFragment(artdaq::FragmentPtrs& frags, size_t seqId, Fragment::fragment_id_t fragmentId, std::string desc)
     917              : {
     918           26 :         TLOG(TLVL_EMPTYFRAGMENT) << desc << " sequence ID " << seqId << ", sending empty fragment";
     919           13 :         auto frag = new Fragment();
     920           13 :         frag->setSequenceID(seqId);
     921           13 :         frag->setFragmentID(fragmentId);
     922           13 :         frag->setSystemType(Fragment::EmptyFragmentType);
     923           13 :         frags.emplace_back(FragmentPtr(frag));
     924           13 :         return true;
     925              : }
     926              : 
     927           28 : void artdaq::FragmentBuffer::sendEmptyFragments(artdaq::FragmentPtrs& frags, std::map<Fragment::sequence_id_t, Fragment::timestamp_t>& requests)
     928              : {
     929           28 :         if (requests.size() > 0)
     930              :         {
     931           54 :                 TLOG(TLVL_SENDEMPTYFRAGMENTS) << "Sending Empty Fragments for Sequence IDs from " << next_sequence_id_ << " up to but not including " << requests.begin()->first;
     932           34 :                 while (requests.begin()->first > next_sequence_id_)
     933              :                 {
     934            7 :                         if (sendMissingFragments_)
     935              :                         {
     936           20 :                                 for (auto& fid : dataBuffers_)
     937              :                                 {
     938           26 :                                         sendEmptyFragment(frags, next_sequence_id_, fid.first, "Missed request for");
     939              :                                 }
     940              :                         }
     941            7 :                         ++next_sequence_id_;
     942              :                 }
     943              :         }
     944           28 : }
     945              : 
     946       300060 : void artdaq::FragmentBuffer::checkSentWindows(artdaq::Fragment::sequence_id_t seq)
     947              : {
     948       600120 :         TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Checking if request " << seq << " can be removed from request list";
     949       300060 :         bool seqComplete = true;
     950       300060 :         bool seqTimeout = false;
     951       600166 :         for (auto& id : dataBuffers_)
     952              :         {
     953       300106 :                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     954       300106 :                 if (!id.second->WindowsSent.count(seq) || id.second->HighestRequestSeen < seq)
     955              :                 {
     956           21 :                         seqComplete = false;
     957              :                 }
     958       300106 :                 if (id.second->WindowsSent.count(seq) && TimeUtils::GetElapsedTimeMicroseconds(id.second->WindowsSent[seq]) > missing_request_window_timeout_us_)
     959              :                 {
     960            4 :                         seqTimeout = true;
     961              :                 }
     962       300106 :         }
     963       300060 :         if (seqComplete)
     964              :         {
     965       600090 :                 TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Request " << seq << " is complete, removing from requestBuffer_.";
     966       300045 :                 requestBuffer_->RemoveRequest(seq);
     967              : 
     968       300045 :                 if (next_sequence_id_ == seq)
     969              :                 {
     970       600058 :                         TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Sequence ID matches ev_counter, incrementing ev_counter (" << next_sequence_id_ << ")";
     971              : 
     972       600078 :                         for (auto& id : dataBuffers_)
     973              :                         {
     974       300049 :                                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     975       300049 :                                 id.second->WindowsSent.erase(seq);
     976       300049 :                         }
     977              : 
     978       300029 :                         ++next_sequence_id_;
     979              :                 }
     980              :         }
     981       300060 :         if (seqTimeout)
     982              :         {
     983            4 :                 TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Sent Window history indicates that requests between " << next_sequence_id_ << " and " << seq << " have timed out.";
     984            6 :                 while (next_sequence_id_ <= seq)
     985              :                 {
     986            6 :                         if (next_sequence_id_ < seq) TLOG(TLVL_CHECKWINDOWS) << "Missed request for sequence ID " << next_sequence_id_ << "! Will not send any data for this sequence ID!";
     987            4 :                         requestBuffer_->RemoveRequest(next_sequence_id_);
     988              : 
     989           12 :                         for (auto& id : dataBuffers_)
     990              :                         {
     991            8 :                                 std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
     992            8 :                                 id.second->WindowsSent.erase(next_sequence_id_);
     993            8 :                         }
     994              : 
     995            4 :                         ++next_sequence_id_;
     996              :                 }
     997              :         }
     998       300060 : }
        

Generated by: LCOV version 2.0-1