LCOV - code coverage report
Current view: top level - artdaq/Generators - CommandableFragmentGenerator.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 66.5 % 227 151
Test Date: 2025-09-04 00:45:34 Functions: 63.2 % 57 36

            Line data    Source code
       1              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       2              : #include "artdaq/DAQdata/Globals.hh"
       3              : #define TRACE_NAME (app_name + "_CommandableFragmentGenerator").c_str()  // include these 2 first -
       4              : 
       5              : #include "artdaq/Generators/CommandableFragmentGenerator.hh"
       6              : 
       7              : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
       8              : #include "artdaq-core/Data/Fragment.hh"
       9              : #include "artdaq-core/Utilities/TimeUtils.hh"
      10              : #include "artdaq/DAQdata/TCPConnect.hh"
      11              : 
      12              : #include "cetlib_except/exception.h"
      13              : #include "fhiclcpp/ParameterSet.h"
      14              : 
      15              : #include <boost/exception/all.hpp>
      16              : #include <boost/lexical_cast.hpp>
      17              : #include <boost/thread.hpp>
      18              : 
      19              : #include <sys/poll.h>
      20              : #include <algorithm>
      21              : #include <chrono>
      22              : #include <exception>
      23              : #include <fstream>
      24              : #include <iomanip>
      25              : #include <iostream>
      26              : #include <iterator>
      27              : #include <limits>
      28              : #include <memory>
      29              : #include <mutex>
      30              : #include <thread>
      31              : 
      32              : #define TLVL_GETNEXT 35
      33              : #define TLVL_GETNEXT_VERBOSE 36
      34              : #define TLVL_CHECKSTOP 37
      35              : #define TLVL_EVCOUNTERINC 38
      36              : #define TLVL_GETDATALOOP 39
      37              : #define TLVL_GETDATALOOP_DATABUFFWAIT 40
      38              : #define TLVL_GETDATALOOP_VERBOSE 41
      39              : #define TLVL_WAITFORBUFFERREADY 42
      40              : #define TLVL_GETBUFFERSTATS 43
      41              : #define TLVL_CHECKDATABUFFER 44
      42              : #define TLVL_GETMONITORINGDATA 45
      43              : #define TLVL_APPLYREQUESTS 46
      44              : #define TLVL_SENDEMPTYFRAGMENTS 47
      45              : #define TLVL_CHECKWINDOWS 48
      46              : #define TLVL_EMPTYFRAGMENT 49
      47              : 
      48            7 : artdaq::CommandableFragmentGenerator::CommandableFragmentGenerator(const fhicl::ParameterSet& ps)
      49            7 :     : mutex_()
      50           14 :     , useMonitoringThread_(ps.get<bool>("separate_monitoring_thread", false))
      51           14 :     , monitoringInterval_(ps.get<int64_t>("hardware_poll_interval_us", 0))
      52            7 :     , isHardwareOK_(true)
      53            7 :     , run_number_(-1)
      54            7 :     , subrun_number_(-1)
      55            7 :     , timeout_(std::numeric_limits<uint64_t>::max())
      56            7 :     , timestamp_(std::numeric_limits<uint64_t>::max())
      57            7 :     , should_stop_(true)
      58            7 :     , exception_(false)
      59           14 :     , latest_exception_report_("none")
      60            7 :     , ev_counter_(1)
      61           28 :     , sleep_on_stop_us_(0)
      62              : {
      63           21 :         auto fragment_ids = ps.get<std::vector<artdaq::Fragment::fragment_id_t>>("fragment_ids", std::vector<artdaq::Fragment::fragment_id_t>());
      64              : 
      65           14 :         TLOG(TLVL_DEBUG + 33) << "artdaq::CommandableFragmentGenerator::CommandableFragmentGenerator(ps)";
      66           14 :         int fragment_id = ps.get<int>("fragment_id", -99);
      67              : 
      68            7 :         if (fragment_id != -99)
      69              :         {
      70            6 :                 if (!fragment_ids.empty())
      71              :                 {
      72            0 :                         latest_exception_report_ = R"(Error in CommandableFragmentGenerator: can't both define "fragment_id" and "fragment_ids" in FHiCL document)";
      73            0 :                         TLOG(TLVL_ERROR) << latest_exception_report_;
      74            0 :                         throw cet::exception(latest_exception_report_);
      75              :                 }
      76              : 
      77            6 :                 fragment_ids.emplace_back(fragment_id);
      78              :         }
      79              : 
      80           14 :         if (ps.has_key("generated_fragments_per_event"))
      81              :         {
      82            3 :                 TLOG(TLVL_WARNING) << "Ignoring deprecated configuration parameter \"generated_fragments_per_event\"";
      83              :         }
      84              : 
      85            7 :         int first_fragment_id = std::numeric_limits<int>::max();
      86           16 :         for (auto& id : fragment_ids)
      87              :         {
      88            9 :                 if (id < first_fragment_id) first_fragment_id = id;
      89            9 :                 expectedTypes_[id] = artdaq::Fragment::EmptyFragmentType;
      90              :         }
      91            7 :         instance_name_for_metrics_ = app_name + "." + boost::lexical_cast<std::string>(first_fragment_id);
      92              : 
      93           14 :         sleep_on_stop_us_ = ps.get<int>("sleep_on_stop_us", 0);
      94            7 : }
      95              : 
      96            7 : artdaq::CommandableFragmentGenerator::~CommandableFragmentGenerator()
      97              : {
      98            7 :         joinThreads();
      99            7 : }
     100              : 
     101           19 : void artdaq::CommandableFragmentGenerator::joinThreads()
     102              : {
     103           19 :         should_stop_ = true;
     104           38 :         TLOG(TLVL_DEBUG + 32) << "Joining monitoringThread";
     105              :         try
     106              :         {
     107           19 :                 if (monitoringThread_.joinable())
     108              :                 {
     109            1 :                         monitoringThread_.join();
     110              :                 }
     111              :         }
     112            0 :         catch (...)
     113              :         {
     114              :                 // IGNORED
     115            0 :         }
     116           38 :         TLOG(TLVL_DEBUG + 32) << "joinThreads complete";
     117           19 : }
     118              : 
     119           38 : bool artdaq::CommandableFragmentGenerator::getNext(FragmentPtrs& output)
     120              : {
     121           38 :         bool result = true;
     122              : 
     123           38 :         if (check_stop()) usleep(sleep_on_stop_us_);
     124           38 :         if (exception() || should_stop_) return false;
     125              : 
     126           32 :         if (!useMonitoringThread_ && monitoringInterval_ > 0)
     127              :         {
     128            4 :                 TLOG(TLVL_GETNEXT) << "getNext: Checking whether to collect Monitoring Data";
     129            2 :                 auto now = std::chrono::steady_clock::now();
     130              : 
     131            2 :                 if (TimeUtils::GetElapsedTimeMicroseconds(lastMonitoringCall_, now) >= static_cast<size_t>(monitoringInterval_))
     132              :                 {
     133            4 :                         TLOG(TLVL_GETNEXT) << "getNext: Collecting Monitoring Data";
     134            2 :                         isHardwareOK_ = checkHWStatus_();
     135            4 :                         TLOG(TLVL_GETNEXT) << "getNext: isHardwareOK_ is now " << std::boolalpha << isHardwareOK_;
     136            2 :                         lastMonitoringCall_ = now;
     137              :                 }
     138              :         }
     139              : 
     140              :         try
     141              :         {
     142           32 :                 std::lock_guard<std::mutex> lk(mutex_);
     143           32 :                 if (!isHardwareOK_)
     144              :                 {
     145            6 :                         TLOG(TLVL_ERROR) << "Stopping CFG because the hardware reports bad status!";
     146            2 :                         return false;
     147              :                 }
     148           60 :                 TLOG(TLVL_DEBUG + 33) << "getNext: Calling getNext_ w/ ev_counter()=" << ev_counter();
     149              :                 try
     150              :                 {
     151           30 :                         result = getNext_(output);
     152              :                 }
     153            0 :                 catch (...)
     154              :                 {
     155            0 :                         throw;
     156            0 :                 }
     157           60 :                 TLOG(TLVL_DEBUG + 33) << "getNext: Done with getNext_ - ev_counter() now " << ev_counter();
     158           62 :                 for (auto& dataIter : output)
     159              :                 {
     160           64 :                         TLOG(TLVL_GETNEXT_VERBOSE) << "getNext: getNext_() returned fragment with sequenceID = " << dataIter->sequenceID()
     161            0 :                                                    << ", type = " << dataIter->typeString() << ", id = " << std::to_string(dataIter->fragmentID())
     162           32 :                                                    << ", timestamp = " << dataIter->timestamp() << ", and sizeBytes = " << dataIter->sizeBytes();
     163              : 
     164           32 :                         auto fragId = dataIter->fragmentID();
     165           32 :                         auto type = dataIter->type();
     166              : 
     167              :                         // ELF, 2020 July 16: System Fragments are excluded from these checks
     168           32 :                         if (Fragment::isSystemFragmentType(type))
     169              :                         {
     170            0 :                                 continue;
     171              :                         }
     172              : 
     173           32 :                         if (!expectedTypes_.count(fragId))
     174              :                         {
     175            0 :                                 TLOG(TLVL_ERROR) << "Received Fragment with Fragment ID " << fragId << ", which is not in the declared list of Fragment IDs! Aborting!";
     176            0 :                                 return false;
     177              :                         }
     178           32 :                         if (expectedTypes_[fragId] == Fragment::EmptyFragmentType)
     179            9 :                                 expectedTypes_[fragId] = type;
     180           23 :                         else if (expectedTypes_[fragId] != type)
     181              :                         {
     182            0 :                                 TLOG(TLVL_WARNING) << "Received Fragment with Fragment ID " << fragId << " and type " << dataIter->typeString() << "(" << type << "), which does not match expected type for this ID (" << expectedTypes_[fragId] << ")";
     183              :                         }
     184              :                 }
     185           32 :         }
     186            0 :         catch (const cet::exception& e)
     187              :         {
     188            0 :                 latest_exception_report_ = "cet::exception caught in getNext(): ";
     189            0 :                 latest_exception_report_.append(e.what());
     190            0 :                 TLOG(TLVL_ERROR) << "getNext: cet::exception caught: " << e;
     191            0 :                 set_exception(true);
     192            0 :                 return false;
     193            0 :         }
     194            0 :         catch (const boost::exception& e)
     195              :         {
     196            0 :                 latest_exception_report_ = "boost::exception caught in getNext(): ";
     197            0 :                 latest_exception_report_.append(boost::diagnostic_information(e));
     198            0 :                 TLOG(TLVL_ERROR) << "getNext: boost::exception caught: " << boost::diagnostic_information(e);
     199            0 :                 set_exception(true);
     200            0 :                 return false;
     201            0 :         }
     202            0 :         catch (const std::exception& e)
     203              :         {
     204            0 :                 latest_exception_report_ = "std::exception caught in getNext(): ";
     205            0 :                 latest_exception_report_.append(e.what());
     206            0 :                 TLOG(TLVL_ERROR) << "getNext: std::exception caught: " << e.what();
     207            0 :                 set_exception(true);
     208            0 :                 return false;
     209            0 :         }
     210            0 :         catch (...)
     211              :         {
     212            0 :                 latest_exception_report_ = "Unknown exception caught in getNext().";
     213            0 :                 TLOG(TLVL_ERROR) << "getNext: unknown exception caught";
     214            0 :                 set_exception(true);
     215            0 :                 return false;
     216            0 :         }
     217              : 
     218           30 :         if (!result)
     219              :         {
     220            0 :                 TLOG(TLVL_DEBUG + 32) << "getNext: Either getNext_ or applyRequests returned false, stopping";
     221              :         }
     222              : 
     223           30 :         if (metricMan && !output.empty())
     224              :         {
     225           29 :                 auto timestamp = output.front()->timestamp();
     226              : 
     227           29 :                 if (output.size() > 1)
     228              :                 {  // Only bother sorting if >1 entry
     229            7 :                         for (auto& outputfrag : output)
     230              :                         {
     231            5 :                                 if (outputfrag->timestamp() > timestamp)
     232              :                                 {
     233            0 :                                         timestamp = outputfrag->timestamp();
     234              :                                 }
     235              :                         }
     236              :                 }
     237              : 
     238          203 :                 metricMan->sendMetric("Last Timestamp", timestamp, "Ticks", 1, MetricMode::LastPoint);
     239              :         }
     240              : 
     241           30 :         return result;
     242              : }
     243              : 
     244           38 : bool artdaq::CommandableFragmentGenerator::check_stop()
     245              : {
     246           76 :         TLOG(TLVL_CHECKSTOP) << "CFG::check_stop: should_stop=" << should_stop() << ", exception status =" << int(exception());
     247              : 
     248           38 :         if (!should_stop()) return false;
     249              : 
     250            6 :         return true;
     251              : }
     252              : 
     253           29 : size_t artdaq::CommandableFragmentGenerator::ev_counter_inc(size_t step)
     254              : {
     255           58 :         TLOG(TLVL_EVCOUNTERINC) << "ev_counter_inc: Incrementing ev_counter from " << ev_counter() << " by " << step;
     256           58 :         return ev_counter_.fetch_add(step);
     257              : }  // returns the prev value
     258              : 
     259            8 : void artdaq::CommandableFragmentGenerator::StartCmd(int run, uint64_t timeout, uint64_t timestamp)
     260              : {
     261           16 :         TLOG(TLVL_DEBUG + 33) << "Start Command received.";
     262            8 :         if (run < 0)
     263              :         {
     264            0 :                 TLOG(TLVL_ERROR) << "negative run number";
     265            0 :                 throw cet::exception("CommandableFragmentGenerator") << "negative run number";  // NOLINT(cert-err60-cpp)
     266              :         }
     267              : 
     268            8 :         timeout_ = timeout;
     269            8 :         timestamp_ = timestamp;
     270            8 :         ev_counter_.store(1);
     271              : 
     272            8 :         should_stop_.store(false);
     273            8 :         exception_.store(false);
     274            8 :         run_number_ = run;
     275            8 :         subrun_number_ = 1;
     276            8 :         latest_exception_report_ = "none";
     277              : 
     278            8 :         start();
     279              : 
     280            8 :         std::unique_lock<std::mutex> lk(mutex_);
     281            8 :         if (useMonitoringThread_) startMonitoringThread();
     282           16 :         TLOG(TLVL_DEBUG + 33) << "Start Command complete.";
     283            8 : }
     284              : 
     285            4 : void artdaq::CommandableFragmentGenerator::StopCmd(uint64_t timeout, uint64_t timestamp)
     286              : {
     287            8 :         TLOG(TLVL_DEBUG + 33) << "Stop Command received.";
     288              : 
     289            4 :         timeout_ = timeout;
     290            4 :         timestamp_ = timestamp;
     291              : 
     292            4 :         stopNoMutex();
     293            4 :         should_stop_.store(true);
     294            4 :         std::unique_lock<std::mutex> lk(mutex_);
     295            4 :         stop();
     296              : 
     297            4 :         joinThreads();
     298            8 :         TLOG(TLVL_DEBUG + 33) << "Stop Command complete.";
     299            4 : }
     300              : 
     301            1 : void artdaq::CommandableFragmentGenerator::PauseCmd(uint64_t timeout, uint64_t timestamp)
     302              : {
     303            2 :         TLOG(TLVL_DEBUG + 33) << "Pause Command received.";
     304            1 :         timeout_ = timeout;
     305            1 :         timestamp_ = timestamp;
     306              : 
     307            1 :         pauseNoMutex();
     308            1 :         should_stop_.store(true);
     309            1 :         std::unique_lock<std::mutex> lk(mutex_);
     310              : 
     311            1 :         pause();
     312            1 : }
     313              : 
     314            1 : void artdaq::CommandableFragmentGenerator::ResumeCmd(uint64_t timeout, uint64_t timestamp)
     315              : {
     316            2 :         TLOG(TLVL_DEBUG + 33) << "Resume Command received.";
     317            1 :         timeout_ = timeout;
     318            1 :         timestamp_ = timestamp;
     319              : 
     320            1 :         subrun_number_ += 1;
     321            1 :         should_stop_ = false;
     322              : 
     323              :         // no lock required: thread not started yet
     324            1 :         resume();
     325              : 
     326            1 :         std::unique_lock<std::mutex> lk(mutex_);
     327              :         // if (useDataThread_) startDataThread();
     328              :         // if (useMonitoringThread_) startMonitoringThread();
     329            2 :         TLOG(TLVL_DEBUG + 33) << "Resume Command complete.";
     330            1 : }
     331              : 
     332            0 : std::string artdaq::CommandableFragmentGenerator::ReportCmd(std::string const& which)
     333              : {
     334            0 :         TLOG(TLVL_DEBUG + 33) << "Report Command received.";
     335            0 :         std::lock_guard<std::mutex> lk(mutex_);
     336              : 
     337              :         // 14-May-2015, KAB: please see the comments associated with the report()
     338              :         // methods in the CommandableFragmentGenerator.hh file for more information
     339              :         // on the use of those methods in this method.
     340              : 
     341              :         // check if the child class has something meaningful for this request
     342            0 :         std::string childReport = reportSpecific(which);
     343            0 :         if (childReport.length() > 0) { return childReport; }
     344              : 
     345              :         // handle the requests that we can take care of at this level
     346            0 :         if (which == "latest_exception")
     347              :         {
     348            0 :                 return latest_exception_report_;
     349              :         }
     350              : 
     351              :         // check if the child class has provided a catch-all report function
     352            0 :         childReport = report();
     353            0 :         if (childReport.length() > 0) { return childReport; }
     354              : 
     355              :         // ELF: 5/31/2019: Let BoardReaderCore's report handle this...
     356              :         /*
     357              :         // if we haven't been able to come up with any report so far, say so
     358              :         std::string tmpString = "The \"" + which + "\" command is not ";
     359              :         tmpString.append("currently supported by the ");
     360              :         tmpString.append(metricsReportingInstanceName());
     361              :         tmpString.append(" fragment generator.");
     362              :         */
     363            0 :         TLOG(TLVL_DEBUG + 33) << "Report Command complete.";
     364            0 :         return "";  // tmpString;
     365            0 : }
     366              : 
     367              : // Default implemenetations of state functions
     368            1 : void artdaq::CommandableFragmentGenerator::pauseNoMutex()
     369              : {
     370              : #pragma message "Using default implementation of CommandableFragmentGenerator::pauseNoMutex()"
     371            1 : }
     372              : 
     373            0 : void artdaq::CommandableFragmentGenerator::pause()
     374              : {
     375              : #pragma message "Using default implementation of CommandableFragmentGenerator::pause()"
     376            0 : }
     377              : 
     378            0 : void artdaq::CommandableFragmentGenerator::resume(){
     379              : #pragma message "Using default implementation of CommandableFragmentGenerator::resume()"
     380            0 : }
     381              : 
     382            0 : std::string artdaq::CommandableFragmentGenerator::report()
     383              : {
     384              : #pragma message "Using default implementation of CommandableFragmentGenerator::report()"
     385            0 :         return "";
     386              : }
     387              : 
     388            0 : std::string artdaq::CommandableFragmentGenerator::reportSpecific(std::string const& /*unused*/)
     389              : {
     390              : #pragma message "Using default implementation of CommandableFragmentGenerator::reportSpecific(std::string)"
     391            0 :         return "";
     392              : }
     393              : 
     394            0 : bool artdaq::CommandableFragmentGenerator::checkHWStatus_()
     395              : {
     396              : #pragma message "Using default implementation of CommandableFragmentGenerator::checkHWStatus_()"
     397            0 :         return true;
     398              : }
     399              : 
     400            0 : bool artdaq::CommandableFragmentGenerator::metaCommand(std::string const& /*unused*/, std::string const& /*unused*/)
     401              : {
     402              : #pragma message "Using default implementation of CommandableFragmentGenerator::metaCommand(std::string, std::string)"
     403            0 :         return true;
     404              : }
     405              : 
     406            1 : void artdaq::CommandableFragmentGenerator::startMonitoringThread()
     407              : {
     408            1 :         if (monitoringThread_.joinable())
     409              :         {
     410            0 :                 monitoringThread_.join();
     411              :         }
     412            3 :         TLOG(TLVL_INFO) << "Starting Hardware Monitoring Thread";
     413              :         try
     414              :         {
     415            1 :                 monitoringThread_ = boost::thread(&CommandableFragmentGenerator::getMonitoringDataLoop, this);
     416              :                 char tname[16];                                            // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     417            1 :                 snprintf(tname, sizeof(tname) - 1, "%d-CFGMon", my_rank);  // NOLINT
     418            1 :                 tname[sizeof(tname) - 1] = '\0';                           // assure term. snprintf is not too evil :)
     419            1 :                 auto handle = monitoringThread_.native_handle();
     420            1 :                 pthread_setname_np(handle, tname);
     421              :         }
     422            0 :         catch (const boost::exception& e)
     423              :         {
     424            0 :                 TLOG(TLVL_ERROR) << "Caught boost::exception starting Hardware Monitoring thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     425            0 :                 throw cet::exception("ThreadError") << "Caught boost::exception starting Hardware Monitoring thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     426            0 :         }
     427            1 : }
     428              : 
     429            1 : void artdaq::CommandableFragmentGenerator::getMonitoringDataLoop()
     430              : {
     431           15 :         while (!should_stop())
     432              :         {
     433           14 :                 if (should_stop() || monitoringInterval_ <= 0)
     434              :                 {
     435            0 :                         TLOG(TLVL_DEBUG + 32) << "getMonitoringDataLoop: should_stop() is " << std::boolalpha << should_stop()
     436            0 :                                               << " and monitoringInterval is " << monitoringInterval_ << ", returning";
     437            0 :                         return;
     438              :                 }
     439           28 :                 TLOG(TLVL_GETMONITORINGDATA) << "getMonitoringDataLoop: Determining whether to call checkHWStatus_";
     440              : 
     441           14 :                 auto now = std::chrono::steady_clock::now();
     442           14 :                 if (TimeUtils::GetElapsedTimeMicroseconds(lastMonitoringCall_, now) >= static_cast<size_t>(monitoringInterval_))
     443              :                 {
     444            2 :                         isHardwareOK_ = checkHWStatus_();
     445            4 :                         TLOG(TLVL_GETMONITORINGDATA) << "getMonitoringDataLoop: isHardwareOK_ is now " << std::boolalpha << isHardwareOK_;
     446            2 :                         lastMonitoringCall_ = now;
     447              :                 }
     448           14 :                 usleep(monitoringInterval_ / 10);
     449              :         }
     450              : }
        

Generated by: LCOV version 2.0-1