LCOV - code coverage report
Current view: top level - artdaq/Application - BoardReaderCore.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 411 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 79 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"  // include these 2 first -
       2              : #define TRACE_NAME (app_name + "_BoardReaderCore").c_str()
       3              : 
       4              : #include "artdaq-core/Core/MonitoredQuantity.hh"
       5              : #include "artdaq-core/Data/Fragment.hh"
       6              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       7              : #include "artdaq/Application/BoardReaderCore.hh"
       8              : #include "artdaq/Application/TaskType.hh"
       9              : #include "artdaq/Generators/makeCommandableFragmentGenerator.hh"
      10              : 
      11              : #include "cetlib_except/exception.h"
      12              : #include "fhiclcpp/ParameterSet.h"
      13              : 
      14              : #include <boost/lexical_cast.hpp>
      15              : 
      16              : #include <pthread.h>
      17              : #include <sched.h>
      18              : #include <algorithm>
      19              : #include <memory>
      20              : #include <thread>
      21              : 
      22              : const std::string artdaq::BoardReaderCore::
      23              :     FRAGMENTS_PROCESSED_STAT_KEY("BoardReaderCoreFragmentsProcessed");
      24              : const std::string artdaq::BoardReaderCore::
      25              :     INPUT_WAIT_STAT_KEY("BoardReaderCoreInputWaitTime");
      26              : const std::string artdaq::BoardReaderCore::BUFFER_WAIT_STAT_KEY("BoardReaderCoreBufferWaitTime");
      27              : const std::string artdaq::BoardReaderCore::REQUEST_WAIT_STAT_KEY("BoardReaderCoreRequestWaitTime");
      28              : const std::string artdaq::BoardReaderCore::
      29              :     OUTPUT_WAIT_STAT_KEY("BoardReaderCoreOutputWaitTime");
      30              : const std::string artdaq::BoardReaderCore::
      31              :     FRAGMENTS_PER_READ_STAT_KEY("BoardReaderCoreFragmentsPerRead");
      32              : 
      33              : std::unique_ptr<artdaq::DataSenderManager> artdaq::BoardReaderCore::sender_ptr_ = nullptr;
      34              : 
      35            0 : artdaq::BoardReaderCore::BoardReaderCore(Commandable& parent_application)
      36            0 :     : parent_application_(parent_application)
      37              :     /*, local_group_comm_(local_group_comm)*/
      38            0 :     , generator_ptr_(nullptr)
      39            0 :     , run_id_(art::RunID::flushRun())
      40            0 :     , fragment_count_(0)
      41            0 :     , stop_requested_(false)
      42            0 :     , pause_requested_(false)
      43              : {
      44            0 :         TLOG(TLVL_DEBUG + 32) << "Constructor";
      45            0 :         statsHelper_.addMonitoredQuantityName(FRAGMENTS_PROCESSED_STAT_KEY);
      46            0 :         statsHelper_.addMonitoredQuantityName(INPUT_WAIT_STAT_KEY);
      47            0 :         statsHelper_.addMonitoredQuantityName(BUFFER_WAIT_STAT_KEY);
      48            0 :         statsHelper_.addMonitoredQuantityName(REQUEST_WAIT_STAT_KEY);
      49            0 :         statsHelper_.addMonitoredQuantityName(OUTPUT_WAIT_STAT_KEY);
      50            0 :         statsHelper_.addMonitoredQuantityName(FRAGMENTS_PER_READ_STAT_KEY);
      51            0 : }
      52              : 
      53            0 : artdaq::BoardReaderCore::~BoardReaderCore()
      54              : {
      55            0 :         TLOG(TLVL_DEBUG + 32) << "Destructor";
      56            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping Request Receiver BEGIN";
      57            0 :         request_receiver_ptr_.reset(nullptr);
      58            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping Request Receiver END";
      59            0 : }
      60              : 
      61            0 : bool artdaq::BoardReaderCore::initialize(fhicl::ParameterSet const& pset, uint64_t /*unused*/, uint64_t /*unused*/)
      62              : {
      63            0 :         TLOG(TLVL_DEBUG + 32) << "initialize method called with "
      64            0 :                               << "ParameterSet = \"" << pset.to_string() << "\".";
      65              : 
      66              :         // pull out the relevant parts of the ParameterSet
      67            0 :         fhicl::ParameterSet daq_pset;
      68              :         try
      69              :         {
      70            0 :                 daq_pset = pset.get<fhicl::ParameterSet>("daq");
      71              :         }
      72            0 :         catch (...)
      73              :         {
      74            0 :                 TLOG(TLVL_ERROR)
      75            0 :                     << "Unable to find the DAQ parameters in the initialization "
      76            0 :                     << "ParameterSet: \"" + pset.to_string() + "\".";
      77            0 :                 return false;
      78            0 :         }
      79            0 :         fhicl::ParameterSet fr_pset;
      80              :         try
      81              :         {
      82            0 :                 fr_pset = daq_pset.get<fhicl::ParameterSet>("fragment_receiver");
      83            0 :                 data_pset_ = fr_pset;
      84              :         }
      85            0 :         catch (...)
      86              :         {
      87            0 :                 TLOG(TLVL_ERROR)
      88            0 :                     << "Unable to find the fragment_receiver parameters in the DAQ "
      89            0 :                     << "initialization ParameterSet: \"" + daq_pset.to_string() + "\".";
      90            0 :                 return false;
      91            0 :         }
      92              : 
      93              :         // pull out the Metric part of the ParameterSet
      94            0 :         fhicl::ParameterSet metric_pset;
      95              :         try
      96              :         {
      97            0 :                 metric_pset = daq_pset.get<fhicl::ParameterSet>("metrics");
      98              :         }
      99            0 :         catch (...)
     100            0 :         {}  // OK if there's no metrics table defined in the FHiCL
     101              : 
     102            0 :         if (metric_pset.is_empty())
     103              :         {
     104            0 :                 TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
     105              :         }
     106              :         try
     107              :         {
     108            0 :                 metricMan->initialize(metric_pset, app_name);
     109              :         }
     110            0 :         catch (...)
     111              :         {
     112            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no,
     113              :                                  "Error loading metrics in BoardReaderCore::initialize()");
     114            0 :         }
     115              : 
     116            0 :         if (daq_pset.has_key("rank"))
     117              :         {
     118            0 :                 if (my_rank >= 0 && daq_pset.get<int>("rank") != my_rank)
     119              :                 {
     120            0 :                         TLOG(TLVL_WARNING) << "BoardReader rank specified at startup is different than rank specified at configure! Using rank received at configure!";
     121              :                 }
     122            0 :                 my_rank = daq_pset.get<int>("rank");
     123              :         }
     124            0 :         if (my_rank == -1)
     125              :         {
     126            0 :                 TLOG(TLVL_ERROR) << "BoardReader rank not specified at startup or in configuration! Aborting";
     127            0 :                 throw cet::exception("RankNotSpecifiedError") << "BoardReader rank not specified at startup or in configuration! Aborting";
     128              :         }
     129              : 
     130              :         // create the requested CommandableFragmentGenerator
     131            0 :         auto frag_gen_name = fr_pset.get<std::string>("generator", "");
     132            0 :         if (frag_gen_name.length() == 0)
     133              :         {
     134            0 :                 TLOG(TLVL_ERROR)
     135            0 :                     << "No fragment generator (parameter name = \"generator\") was "
     136            0 :                     << "specified in the fragment_receiver ParameterSet.  The "
     137            0 :                     << "DAQ initialization PSet was \"" << daq_pset.to_string() << "\".";
     138            0 :                 return false;
     139              :         }
     140              : 
     141              :         try
     142              :         {
     143            0 :                 generator_ptr_ = artdaq::makeCommandableFragmentGenerator(frag_gen_name, fr_pset);
     144              :         }
     145            0 :         catch (...)
     146              :         {
     147            0 :                 std::stringstream exception_string;
     148              :                 exception_string << "Exception thrown during initialization of fragment generator of type \""
     149            0 :                                  << frag_gen_name << "\"";
     150              : 
     151            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
     152              : 
     153            0 :                 TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the fragment generator which threw an exception: " << fr_pset.to_string();
     154              : 
     155            0 :                 return false;
     156            0 :         }
     157              : 
     158              :         try
     159              :         {
     160            0 :                 fragment_buffer_ptr_ = std::make_shared<FragmentBuffer>(fr_pset);
     161              :         }
     162            0 :         catch (...)
     163              :         {
     164            0 :                 std::stringstream exception_string;
     165            0 :                 exception_string << "Exception thrown during initialization of Fragment Buffer";
     166              : 
     167            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no, exception_string.str());
     168              : 
     169            0 :                 TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the fragment buffer which threw an exception: " << fr_pset.to_string();
     170              : 
     171            0 :                 return false;
     172            0 :         }
     173              : 
     174            0 :         std::shared_ptr<RequestBuffer> request_buffer = std::make_shared<RequestBuffer>(fr_pset.get<artdaq::Fragment::sequence_id_t>("request_increment", 1));
     175              : 
     176              :         try
     177              :         {
     178            0 :                 request_receiver_ptr_.reset(new RequestReceiver(fr_pset, request_buffer));
     179            0 :                 generator_ptr_->SetRequestBuffer(request_buffer);
     180            0 :                 generator_ptr_->SetFragmentBuffer(fragment_buffer_ptr_);
     181            0 :                 fragment_buffer_ptr_->SetRequestBuffer(request_buffer);
     182              :         }
     183            0 :         catch (...)
     184              :         {
     185            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no, "Exception thrown during initialization of request receiver");
     186              : 
     187            0 :                 TLOG(TLVL_DEBUG + 32) << "FHiCL parameter set used to initialize the request receiver which threw an exception: " << fr_pset.to_string();
     188              : 
     189            0 :                 return false;
     190            0 :         }
     191            0 :         metricMan->setPrefix(generator_ptr_->metricsReportingInstanceName());
     192              : 
     193            0 :         rt_priority_ = fr_pset.get<int>("rt_priority", 0);
     194              : 
     195              :         // fetch the monitoring parameters and create the MonitoredQuantity instances
     196            0 :         statsHelper_.createCollectors(fr_pset, 100, 30.0, 60.0, FRAGMENTS_PER_READ_STAT_KEY);
     197              : 
     198              :         // check if we should skip the sequence ID test...
     199            0 :         skip_seqId_test_ = (fr_pset.get<bool>("skip_seqID_test", false) || generator_ptr_->fragmentIDs().size() > 1 || fragment_buffer_ptr_->request_mode() != RequestMode::Ignored);
     200              : 
     201            0 :         verbose_ = fr_pset.get<bool>("verbose", true);
     202              : 
     203            0 :         return true;
     204            0 : }
     205              : 
     206            0 : bool artdaq::BoardReaderCore::start(art::RunID id, uint64_t timeout, uint64_t timestamp)
     207              : {
     208            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting run " << id.run();
     209            0 :         stop_requested_.store(false);
     210            0 :         pause_requested_.store(false);
     211              : 
     212            0 :         fragment_count_ = 0;
     213            0 :         prev_seq_id_ = 0;
     214            0 :         statsHelper_.resetStatistics();
     215              : 
     216            0 :         fragment_buffer_ptr_->Reset(false);
     217              : 
     218            0 :         metricMan->do_start();
     219              : 
     220            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing for 1s";
     221            0 :         usleep(1000000);
     222              : 
     223            0 :         generator_ptr_->StartCmd(id.run(), timeout, timestamp);
     224            0 :         run_id_ = id;
     225              : 
     226            0 :         request_receiver_ptr_->SetRunNumber(static_cast<uint32_t>(id.run()));
     227            0 :         request_receiver_ptr_->startRequestReception();
     228              : 
     229            0 :         running_ = true;
     230            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Start transition (Started run) for run " << run_id_.run()
     231            0 :                                                        << ", timeout = " << timeout << ", timestamp = " << timestamp;
     232            0 :         return true;
     233              : }
     234              : 
     235            0 : bool artdaq::BoardReaderCore::stop(uint64_t timeout, uint64_t timestamp)
     236              : {
     237            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Stopping run " << run_id_.run() << " after " << fragment_count_ << " fragments.";
     238            0 :         stop_requested_.store(true);
     239              : 
     240            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping Request reception BEGIN";
     241            0 :         request_receiver_ptr_->stopRequestReception();
     242            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping Request reception END";
     243              : 
     244            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping CommandableFragmentGenerator BEGIN";
     245            0 :         generator_ptr_->StopCmd(timeout, timestamp);
     246            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping CommandableFragmentGenerator END";
     247              : 
     248            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping FragmentBuffer";
     249            0 :         fragment_buffer_ptr_->Stop();
     250              : 
     251            0 :         TLOG(TLVL_DEBUG + 32) << "Stopping DataSenderManager";
     252            0 :         if (sender_ptr_)
     253              :         {
     254            0 :                 sender_ptr_->StopSender();
     255              :         }
     256              : 
     257            0 :         running_ = false;
     258            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Stop transition for run " << run_id_.run();
     259            0 :         return true;
     260              : }
     261              : 
     262            0 : bool artdaq::BoardReaderCore::pause(uint64_t timeout, uint64_t timestamp)
     263              : {
     264            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing run " << run_id_.run() << " after " << fragment_count_ << " fragments.";
     265            0 :         pause_requested_.store(true);
     266            0 :         generator_ptr_->PauseCmd(timeout, timestamp);
     267            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Pause transition for run " << run_id_.run();
     268            0 :         return true;
     269              : }
     270              : 
     271            0 : bool artdaq::BoardReaderCore::resume(uint64_t timeout, uint64_t timestamp)
     272              : {
     273            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Resuming run " << run_id_.run();
     274            0 :         pause_requested_.store(false);
     275            0 :         metricMan->do_start();
     276            0 :         generator_ptr_->ResumeCmd(timeout, timestamp);
     277            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Resume transition for run " << run_id_.run();
     278            0 :         return true;
     279              : }
     280              : 
     281            0 : bool artdaq::BoardReaderCore::shutdown(uint64_t /*unused*/)
     282              : {
     283            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting Shutdown transition";
     284            0 :         generator_ptr_->joinThreads();  // Cleanly shut down the CommandableFragmentGenerator
     285            0 :         generator_ptr_.reset(nullptr);
     286            0 :         metricMan->shutdown();
     287            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed Shutdown transition";
     288            0 :         return true;
     289              : }
     290              : 
     291            0 : bool artdaq::BoardReaderCore::soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
     292              : {
     293            0 :         TLOG(TLVL_DEBUG + 32) << "soft_initialize method called with "
     294            0 :                               << "ParameterSet = \"" << pset.to_string()
     295            0 :                               << "\". Forwarding to initialize.";
     296            0 :         return initialize(pset, timeout, timestamp);
     297              : }
     298              : 
     299            0 : bool artdaq::BoardReaderCore::reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
     300              : {
     301            0 :         TLOG(TLVL_DEBUG + 32) << "reinitialize method called with "
     302            0 :                               << "ParameterSet = \"" << pset.to_string()
     303            0 :                               << "\". Forwarding to initalize.";
     304            0 :         return initialize(pset, timeout, timestamp);
     305              : }
     306              : 
     307            0 : void artdaq::BoardReaderCore::receive_fragments()
     308              : {
     309            0 :         if (rt_priority_ > 0)
     310              :         {
     311              : #pragma GCC diagnostic push
     312              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     313            0 :                 sched_param s_param = {};
     314            0 :                 s_param.sched_priority = rt_priority_;
     315            0 :                 if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param))
     316            0 :                         TLOG(TLVL_WARNING) << "setting realtime priority failed";
     317              : #pragma GCC diagnostic pop
     318              :         }
     319              : 
     320              :         // try-catch block here?
     321              : 
     322              :         // how to turn RT PRI off?
     323            0 :         if (rt_priority_ > 0)
     324              :         {
     325              : #pragma GCC diagnostic push
     326              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     327            0 :                 sched_param s_param = {};
     328            0 :                 s_param.sched_priority = rt_priority_;
     329            0 :                 int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
     330            0 :                 if (status != 0)
     331              :                 {
     332            0 :                         TLOG(TLVL_ERROR)
     333            0 :                             << "Failed to set realtime priority to " << rt_priority_
     334            0 :                             << ", return code = " << status;
     335              :                 }
     336              : #pragma GCC diagnostic pop
     337              :         }
     338              : 
     339            0 :         TLOG(TLVL_DEBUG + 32) << "Waiting for first fragment.";
     340              :         artdaq::MonitoredQuantityStats::TIME_POINT_T startTime, after_input, after_buffer;
     341            0 :         artdaq::FragmentPtrs frags;
     342              : 
     343            0 :         receiver_thread_active_ = true;
     344              : 
     345            0 :         auto wait_start = std::chrono::steady_clock::now();
     346            0 :         while (!running_ && TimeUtils::GetElapsedTime(wait_start) < start_transition_timeout_)
     347              :         {
     348            0 :                 usleep(10000);
     349              :         }
     350            0 :         if (!running_)
     351              :         {
     352            0 :                 TLOG(TLVL_ERROR) << "Timeout (" << start_transition_timeout_ << " s) while waiting for Start after receive_fragments thread started!";
     353            0 :                 receiver_thread_active_ = false;
     354              :         }
     355              : 
     356            0 :         while (receiver_thread_active_)
     357              :         {
     358            0 :                 startTime = artdaq::MonitoredQuantity::getCurrentTime();
     359              : 
     360            0 :                 TLOG(TLVL_DEBUG + 35) << "receive_fragments getNext start";
     361            0 :                 receiver_thread_active_ = generator_ptr_->getNext(frags);
     362            0 :                 TLOG(TLVL_DEBUG + 35) << "receive_fragments getNext done (receiver_thread_active_=" << receiver_thread_active_ << ")";
     363              : 
     364              :                 // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
     365              :                 // (which indicates that the data flow has stopped) *and* the reason that
     366              :                 // it has stopped is because there was an exception that wasn't handled by
     367              :                 // the experiment-specific FragmentGenerator class, we move to the
     368              :                 // InRunError state so that external observers (e.g. RunControl or
     369              :                 // DAQInterface) can see that there was a problem.
     370            0 :                 if (!receiver_thread_active_ && generator_ptr_ && generator_ptr_->exception())
     371              :                 {
     372            0 :                         parent_application_.in_run_failure();
     373              :                 }
     374              : 
     375            0 :                 after_input = artdaq::MonitoredQuantity::getCurrentTime();
     376              : 
     377            0 :                 if (!receiver_thread_active_) { break; }
     378            0 :                 statsHelper_.addSample(FRAGMENTS_PER_READ_STAT_KEY, frags.size());
     379              : 
     380            0 :                 if (frags.size() > 0)
     381              :                 {
     382            0 :                         TLOG(TLVL_DEBUG + 35) << "receive_fragments AddFragmentsToBuffer start";
     383            0 :                         fragment_buffer_ptr_->AddFragmentsToBuffer(std::move(frags));
     384            0 :                         TLOG(TLVL_DEBUG + 35) << "receive_fragments AddFragmentsToBuffer done";
     385              :                 }
     386              : 
     387            0 :                 after_buffer = artdaq::MonitoredQuantity::getCurrentTime();
     388            0 :                 TLOG(TLVL_DEBUG + 34) << "receive_fragments INPUT_WAIT=" << (after_input - startTime) << ", BUFFER_WAIT=" << (after_buffer - after_input);
     389            0 :                 statsHelper_.addSample(INPUT_WAIT_STAT_KEY, after_input - startTime);
     390            0 :                 statsHelper_.addSample(BUFFER_WAIT_STAT_KEY, after_buffer - after_input);
     391            0 :                 if (statsHelper_.statsRollingWindowHasMoved()) { sendMetrics_(); }
     392            0 :                 frags.clear();
     393              :         }
     394              : 
     395              :         // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
     396              :         // processing fragments loop so that metrics correctly go to zero when
     397              :         // there is no data flowing
     398            0 :         metricMan->do_stop();
     399              : 
     400            0 :         TLOG(TLVL_DEBUG + 32) << "receive_fragments loop end";
     401            0 : }
     402            0 : void artdaq::BoardReaderCore::send_fragments()
     403              : {
     404            0 :         if (rt_priority_ > 0)
     405              :         {
     406              : #pragma GCC diagnostic push
     407              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     408            0 :                 sched_param s_param = {};
     409            0 :                 s_param.sched_priority = rt_priority_;
     410            0 :                 if (pthread_setschedparam(pthread_self(), SCHED_RR, &s_param) != 0)
     411              :                 {
     412            0 :                         TLOG(TLVL_WARNING) << "setting realtime priority failed";
     413              :                 }
     414              : #pragma GCC diagnostic pop
     415              :         }
     416              : 
     417              :         // try-catch block here?
     418              : 
     419              :         // how to turn RT PRI off?
     420            0 :         if (rt_priority_ > 0)
     421              :         {
     422              : #pragma GCC diagnostic push
     423              : #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
     424            0 :                 sched_param s_param = {};
     425            0 :                 s_param.sched_priority = rt_priority_;
     426            0 :                 int status = pthread_setschedparam(pthread_self(), SCHED_RR, &s_param);
     427            0 :                 if (status != 0)
     428              :                 {
     429            0 :                         TLOG(TLVL_ERROR)
     430            0 :                             << "Failed to set realtime priority to " << rt_priority_
     431            0 :                             << ", return code = " << status;
     432              :                 }
     433              : #pragma GCC diagnostic pop
     434              :         }
     435              : 
     436            0 :         TLOG(TLVL_DEBUG + 32) << "Initializing DataSenderManager. my_rank=" << my_rank;
     437            0 :         sender_ptr_ = std::make_unique<artdaq::DataSenderManager>(data_pset_);
     438              : 
     439            0 :         TLOG(TLVL_DEBUG + 32) << "Waiting for first fragment.";
     440              :         artdaq::MonitoredQuantityStats::TIME_POINT_T startTime;
     441              :         double delta_time;
     442            0 :         artdaq::FragmentPtrs frags;
     443            0 :         auto targetFragCount = generator_ptr_->fragmentIDs().size();
     444              : 
     445            0 :         sender_thread_active_ = true;
     446              : 
     447            0 :         auto wait_start = std::chrono::steady_clock::now();
     448            0 :         while (!running_ && TimeUtils::GetElapsedTime(wait_start) < start_transition_timeout_)
     449              :         {
     450            0 :                 usleep(10000);
     451              :         }
     452            0 :         if (!running_)
     453              :         {
     454            0 :                 TLOG(TLVL_ERROR) << "Timeout (" << start_transition_timeout_ << " s) while waiting for Start after send_fragments thread started!";
     455            0 :                 sender_thread_active_ = false;
     456              :         }
     457              : 
     458            0 :         while (sender_thread_active_)
     459              :         {
     460            0 :                 startTime = artdaq::MonitoredQuantity::getCurrentTime();
     461              : 
     462            0 :                 TLOG(TLVL_DEBUG + 35) << "send_fragments applyRequests start";
     463            0 :                 sender_thread_active_ = fragment_buffer_ptr_->applyRequests(frags);
     464            0 :                 TLOG(TLVL_DEBUG + 35) << "send_fragments applyRequests done (sender_thread_active_=" << sender_thread_active_ << ")";
     465              :                 // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
     466              :                 // (which indicates that the data flow has stopped) *and* the reason that
     467              :                 // it has stopped is because there was an exception that wasn't handled by
     468              :                 // the experiment-specific FragmentGenerator class, we move to the
     469              :                 // InRunError state so that external observers (e.g. RunControl or
     470              :                 // DAQInterface) can see that there was a problem.
     471            0 :                 if (!sender_thread_active_ && generator_ptr_ && generator_ptr_->exception())
     472              :                 {
     473            0 :                         parent_application_.in_run_failure();
     474              :                 }
     475              : 
     476            0 :                 delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
     477              : 
     478            0 :                 TLOG(TLVL_DEBUG + 34) << "send_fragments REQUEST_WAIT=" << delta_time;
     479            0 :                 statsHelper_.addSample(REQUEST_WAIT_STAT_KEY, delta_time);
     480              : 
     481            0 :                 if (!sender_thread_active_) { break; }
     482              : 
     483            0 :                 for (auto& fragPtr : frags)
     484              :                 {
     485            0 :                         if (fragPtr == nullptr)
     486              :                         {
     487            0 :                                 TLOG(TLVL_WARNING) << "Encountered a bad fragment pointer in fragment " << fragment_count_ << ". "
     488            0 :                                                    << "This is most likely caused by a problem with the Fragment Generator!";
     489            0 :                                 continue;
     490            0 :                         }
     491            0 :                         if (fragment_count_ == 0)
     492              :                         {
     493            0 :                                 TLOG(TLVL_DEBUG + 32) << "Received first Fragment from Fragment Generator, sequence ID " << fragPtr->sequenceID() << ", size = " << fragPtr->sizeBytes() << " bytes.";
     494              :                         }
     495              : 
     496            0 :                         if (artdaq::Fragment::isBroadcastFragmentType(fragPtr->type()))
     497              :                         {
     498              :                                 // Just broadcast any system Fragments in the output
     499            0 :                                 artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
     500            0 :                                 statsHelper_.addSample(FRAGMENTS_PROCESSED_STAT_KEY, fragPtr->sizeBytes());
     501              : 
     502            0 :                                 startTime = artdaq::MonitoredQuantity::getCurrentTime();
     503            0 :                                 TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment start";
     504            0 :                                 auto res = sender_ptr_->sendFragment(std::move(*fragPtr));
     505            0 :                                 TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment done (dest=" << res.first << ", sts=" << TransferInterface::CopyStatusToString(res.second) << ")";
     506            0 :                                 ++fragment_count_;
     507            0 :                                 statsHelper_.addSample(OUTPUT_WAIT_STAT_KEY,
     508            0 :                                                        artdaq::MonitoredQuantity::getCurrentTime() - startTime);
     509            0 :                                 continue;
     510            0 :                         }
     511              : 
     512            0 :                         artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
     513            0 :                         Globals::SetMFIteration("Sequence ID " + std::to_string(sequence_id));
     514            0 :                         statsHelper_.addSample(FRAGMENTS_PROCESSED_STAT_KEY, fragPtr->sizeBytes());
     515              : 
     516              :                         /*if ((fragment_count_ % 250) == 0)
     517              :                         {
     518              :                             TLOG(TLVL_DEBUG + 32)
     519              :                                 << "Sending fragment " << fragment_count_
     520              :                                 << " with sequence id " << sequence_id << ".";
     521              :                         }*/
     522              : 
     523              :                         // check for continous sequence IDs
     524            0 :                         if (!skip_seqId_test_ && abs(static_cast<int64_t>(sequence_id) - static_cast<int64_t>(prev_seq_id_)) > 1)
     525              :                         {
     526            0 :                                 TLOG(TLVL_WARNING)
     527            0 :                                     << "Missing sequence IDs: current sequence ID = "
     528            0 :                                     << sequence_id << ", previous sequence ID = "
     529            0 :                                     << prev_seq_id_ << ".";
     530              :                         }
     531            0 :                         prev_seq_id_ = sequence_id;
     532              : 
     533            0 :                         startTime = artdaq::MonitoredQuantity::getCurrentTime();
     534            0 :                         TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment start";
     535            0 :                         auto res = sender_ptr_->sendFragment(std::move(*fragPtr));
     536            0 :                         if (sender_ptr_->GetSentSequenceIDCount(sequence_id) == targetFragCount)
     537              :                         {
     538            0 :                                 sender_ptr_->RemoveRoutingTableEntry(sequence_id);
     539              :                         }
     540            0 :                         TLOG(TLVL_DEBUG + 36) << "send_fragments seq=" << sequence_id << " sendFragment done (dest=" << res.first << ", sts=" << TransferInterface::CopyStatusToString(res.second) << ")";
     541            0 :                         ++fragment_count_;
     542            0 :                         statsHelper_.addSample(OUTPUT_WAIT_STAT_KEY,
     543            0 :                                                artdaq::MonitoredQuantity::getCurrentTime() - startTime);
     544              : 
     545            0 :                         bool readyToReport = statsHelper_.readyToReport();
     546            0 :                         if (readyToReport)
     547              :                         {
     548            0 :                                 TLOG(TLVL_INFO) << buildStatisticsString_();
     549              :                         }
     550              : 
     551              :                         // Turn on lvls (mem and/or slow) 3,13,14 to log every send.
     552            0 :                         TLOG(((fragment_count_ == 1) ? TLVL_DEBUG + 32
     553              :                                                      : (((fragment_count_ % 250) == 0 || readyToReport) ? TLVL_DEBUG + 36 : TLVL_DEBUG + 37)))
     554            0 :                             << ((fragment_count_ == 1)
     555            0 :                                     ? "Sent first Fragment"
     556            0 :                                     : "Sending fragment " + std::to_string(fragment_count_))
     557            0 :                             << " with SeqID " << sequence_id << ".";
     558              :                 }
     559            0 :                 if (statsHelper_.statsRollingWindowHasMoved()) { sendMetrics_(); }
     560            0 :                 frags.clear();
     561            0 :                 std::this_thread::yield();
     562              :         }
     563              : 
     564            0 :         sender_ptr_.reset(nullptr);
     565              : 
     566              :         // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
     567              :         // processing fragments loop so that metrics correctly go to zero when
     568              :         // there is no data flowing
     569            0 :         metricMan->do_stop();
     570              : 
     571            0 :         TLOG(TLVL_DEBUG + 32) << "send_fragments loop end";
     572            0 : }
     573              : 
     574            0 : std::string artdaq::BoardReaderCore::report(std::string const& which) const
     575              : {
     576            0 :         std::string resultString;
     577              : 
     578              :         // pass the request to the FragmentGenerator instance, if it's available
     579            0 :         if (generator_ptr_ != nullptr && which != "core")
     580              :         {
     581            0 :                 resultString = generator_ptr_->ReportCmd(which);
     582            0 :                 if (resultString.length() > 0) { return resultString; }
     583              :         }
     584              : 
     585              :         // handle the request at this level, if we can
     586              :         // --> nothing here yet
     587              : 
     588              :         // if we haven't been able to come up with any report so far, say so
     589            0 :         std::string tmpString = app_name + " run number = ";
     590            0 :         tmpString.append(boost::lexical_cast<std::string>(run_id_.run()));
     591              : 
     592            0 :         tmpString.append(", Sent Fragment count = ");
     593            0 :         tmpString.append(boost::lexical_cast<std::string>(fragment_count_));
     594            0 :         if (which == "core")
     595              :         {
     596              :                 // do nothing
     597              :         }
     598              :         //-----------------------------------------------------------------------------
     599              :         // P.Murat: add statistics report, the const/non const confusion to be cleaned up
     600              :         //          by the maintainers
     601              :         //-----------------------------------------------------------------------------
     602            0 :         else if (which == "stats")
     603              :         {
     604            0 :                 auto non_const_this = (artdaq::BoardReaderCore*)this;
     605            0 :                 tmpString += ", " + non_const_this->buildStatisticsString_();
     606              :         }
     607              :         else
     608              :         {
     609            0 :                 tmpString.append(". Command=\"" + which + "\" is not currently supported.");
     610              :         }
     611            0 :         return tmpString;
     612            0 : }
     613              : 
     614            0 : bool artdaq::BoardReaderCore::metaCommand(std::string const& command, std::string const& arg)
     615              : {
     616            0 :         TLOG(TLVL_DEBUG + 32) << "metaCommand method called with "
     617            0 :                               << "command = \"" << command << "\""
     618            0 :                               << ", arg = \"" << arg << "\""
     619            0 :                               << ".";
     620              : 
     621            0 :         if (generator_ptr_)
     622              :         {
     623            0 :                 return generator_ptr_->metaCommand(command, arg);
     624              :         }
     625              : 
     626            0 :         return true;
     627              : }
     628              : 
     629            0 : std::string artdaq::BoardReaderCore::buildStatisticsString_()
     630              : {
     631            0 :         std::ostringstream oss;
     632            0 :         double fragmentsGeneratedCount = 1.0;
     633            0 :         double fragmentsOutputCount = 1.0;
     634            0 :         oss << app_name << " statistics:" << std::endl;
     635              : 
     636            0 :         oss << "  Fragments read: ";
     637            0 :         artdaq::MonitoredQuantityPtr mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PER_READ_STAT_KEY);
     638            0 :         if (mqPtr.get() != nullptr)
     639              :         {
     640            0 :                 artdaq::MonitoredQuantityStats stats;
     641            0 :                 mqPtr->getStats(stats);
     642            0 :                 oss << stats.recentValueSum << " fragments generated at "
     643            0 :                     << stats.recentSampleRate << " getNext calls/sec, fragment rate = "
     644            0 :                     << stats.recentValueRate << " fragments/sec, monitor window = "
     645            0 :                     << stats.recentDuration << " sec, min::max read size = "
     646            0 :                     << stats.recentValueMin
     647            0 :                     << "::"
     648            0 :                     << stats.recentValueMax
     649            0 :                     << " fragments";
     650            0 :                 fragmentsGeneratedCount = std::max(double(stats.recentSampleCount), 1.0);
     651            0 :                 oss << " Average times per fragment: ";
     652            0 :                 if (stats.recentSampleRate > 0.0)
     653              :                 {
     654            0 :                         oss << " elapsed time = "
     655            0 :                             << (1.0 / stats.recentSampleRate) << " sec";
     656              :                 }
     657            0 :         }
     658              : 
     659            0 :         oss << std::endl;
     660            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PROCESSED_STAT_KEY);
     661            0 :         if (mqPtr.get() != nullptr)
     662              :         {
     663            0 :                 artdaq::MonitoredQuantityStats stats;
     664            0 :                 mqPtr->getStats(stats);
     665            0 :                 oss << "  Fragment output statistics: "
     666            0 :                     << stats.recentSampleCount << " fragments sent at "
     667            0 :                     << stats.recentSampleRate << " fragments/sec, effective data rate = "
     668            0 :                     << (stats.recentValueRate / 1024.0 / 1024.0) << " MB/sec, monitor window = "
     669            0 :                     << stats.recentDuration << " sec, min::max event size = "
     670            0 :                     << (stats.recentValueMin / 1024.0 / 1024.0)
     671            0 :                     << "::"
     672            0 :                     << (stats.recentValueMax / 1024.0 / 1024.0)
     673            0 :                     << " MB" << std::endl;
     674            0 :                 fragmentsOutputCount = std::max(double(stats.recentSampleCount), 1.0);
     675            0 :         }
     676              : 
     677              :         // 31-Dec-2014, KAB - Just a reminder that using "fragmentCount" in the
     678              :         // denominator of the calculations below is important because the way that
     679              :         // the accumulation of these statistics is done is not fragment-by-fragment
     680              :         // but read-by-read (where each read can contain multiple fragments).
     681              :         // 29-Aug-2016, KAB - BRSYNC_WAIT and OUTPUT_WAIT are now done fragment-by-
     682              :         // fragment, but we'll leave the calculation the same. (The alternative
     683              :         // would be to use recentValueAverage().)
     684              : 
     685            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(INPUT_WAIT_STAT_KEY);
     686            0 :         if (mqPtr.get() != nullptr)
     687              :         {
     688            0 :                 oss << "  Input wait time = "
     689            0 :                     << (mqPtr->getRecentValueSum() / fragmentsGeneratedCount) << " s/fragment";
     690              :         }
     691            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(BUFFER_WAIT_STAT_KEY);
     692            0 :         if (mqPtr.get() != 0)
     693              :         {
     694            0 :                 oss << ", buffer wait time = "
     695            0 :                     << (mqPtr->getRecentValueSum() / fragmentsGeneratedCount) << " s/fragment";
     696              :         }
     697            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(REQUEST_WAIT_STAT_KEY);
     698            0 :         if (mqPtr.get() != 0)
     699              :         {
     700            0 :                 oss << ", request wait time = "
     701            0 :                     << (mqPtr->getRecentValueSum() / fragmentsOutputCount) << " s/fragment";
     702              :         }
     703              : 
     704            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(OUTPUT_WAIT_STAT_KEY);
     705            0 :         if (mqPtr.get() != nullptr)
     706              :         {
     707            0 :                 oss << ", output wait time = "
     708            0 :                     << (mqPtr->getRecentValueSum() / fragmentsOutputCount) << " s/fragment";
     709              :         }
     710              :         //-----------------------------------------------------------------------------
     711              :         // 2024-01-13 P.Murat: add SHM data
     712              :         //-----------------------------------------------------------------------------
     713            0 :         oss << fragment_buffer_ptr_->getStatReport();
     714              : 
     715            0 :         return oss.str();
     716            0 : }
     717              : 
     718            0 : void artdaq::BoardReaderCore::sendMetrics_()
     719              : {
     720              :         // TLOG(TLVL_DEBUG + 32) << "Sending metrics " << __LINE__ ;
     721            0 :         double fragmentCount = 1.0;
     722            0 :         artdaq::MonitoredQuantityPtr mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PROCESSED_STAT_KEY);
     723            0 :         if (mqPtr.get() != nullptr)
     724              :         {
     725            0 :                 artdaq::MonitoredQuantityStats stats;
     726            0 :                 mqPtr->getStats(stats);
     727            0 :                 fragmentCount = std::max(double(stats.recentSampleCount), 1.0);
     728            0 :                 metricMan->sendMetric("Fragment Count", stats.fullSampleCount, "fragments", 1, MetricMode::LastPoint);
     729            0 :                 metricMan->sendMetric("Fragment Rate", stats.recentSampleRate, "fragments/sec", 1, MetricMode::Average);
     730            0 :                 metricMan->sendMetric("Average Fragment Size", stats.recentValueAverage, "bytes/fragment", 2, MetricMode::Average);
     731            0 :                 metricMan->sendMetric("Data Rate", stats.recentValueRate, "bytes/sec", 2, MetricMode::Average);
     732            0 :         }
     733              : 
     734              :         // 31-Dec-2014, KAB - Just a reminder that using "fragmentCount" in the
     735              :         // denominator of the calculations below is important because the way that
     736              :         // the accumulation of these statistics is done is not fragment-by-fragment
     737              :         // but read-by-read (where each read can contain multiple fragments).
     738              :         // 29-Aug-2016, KAB - BRSYNC_WAIT and OUTPUT_WAIT are now done fragment-by-
     739              :         // fragment, but we'll leave the calculation the same. (The alternative
     740              :         // would be to use recentValueAverage().)
     741              : 
     742            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(INPUT_WAIT_STAT_KEY);
     743            0 :         if (mqPtr.get() != nullptr)
     744              :         {
     745            0 :                 metricMan->sendMetric("Avg Input Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
     746              :         }
     747              : 
     748            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(BUFFER_WAIT_STAT_KEY);
     749            0 :         if (mqPtr.get() != 0)
     750              :         {
     751            0 :                 metricMan->sendMetric("Avg Buffer Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
     752              :         }
     753            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(REQUEST_WAIT_STAT_KEY);
     754            0 :         if (mqPtr.get() != 0)
     755              :         {
     756            0 :                 metricMan->sendMetric("Avg Request Response Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
     757              :         }
     758            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(OUTPUT_WAIT_STAT_KEY);
     759            0 :         if (mqPtr.get() != nullptr)
     760              :         {
     761            0 :                 metricMan->sendMetric("Avg Output Wait Time", (mqPtr->getRecentValueSum() / fragmentCount), "seconds/fragment", 3, MetricMode::Average);
     762              :         }
     763              : 
     764            0 :         mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_PER_READ_STAT_KEY);
     765            0 :         if (mqPtr.get() != nullptr)
     766              :         {
     767            0 :                 metricMan->sendMetric("Avg Frags Per Read", mqPtr->getRecentValueAverage(), "fragments/read", 4, MetricMode::Average);
     768              :         }
     769            0 : }
        

Generated by: LCOV version 2.0-1