LCOV - code coverage report
Current view: top level - test/DAQrate - GenToBuffer_t.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 120 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 23 0

            Line data    Source code
       1              : #include "TRACE/tracemf.h"
       2              : #define TRACE_NAME "GenToBuffer"
       3              : 
       4              : #include "artdaq-core/Utilities/configureMessageFacility.hh"
       5              : #include "artdaq/Application/LoadParameterSet.hh"
       6              : #include "artdaq/DAQdata/Globals.hh"
       7              : #include "artdaq/DAQrate/FragmentBuffer.hh"
       8              : #include "artdaq/DAQrate/RequestBuffer.hh"
       9              : #include "artdaq/Generators/CommandableFragmentGenerator.hh"
      10              : #include "artdaq/Generators/makeCommandableFragmentGenerator.hh"
      11              : 
      12              : #include "fhiclcpp/ParameterSet.h"
      13              : #include "fhiclcpp/types/Atom.h"
      14              : #include "fhiclcpp/types/Comment.h"
      15              : #include "fhiclcpp/types/Name.h"
      16              : #include "fhiclcpp/types/Table.h"
      17              : #include "fhiclcpp/types/TableFragment.h"
      18              : 
      19              : #include <boost/program_options.hpp>
      20              : namespace bpo = boost::program_options;
      21              : #include <boost/thread.hpp>
      22              : 
      23              : #include <thread>
      24              : 
      25              : namespace artdaq {
      26              : /**
      27              :  * @brief Test fixture for GenToBuffer_t
      28              :  */
      29              : class GenToBufferTest
      30              : {
      31              : public:
      32              :         /**
      33              :          * @brief GenToBufferTest constructor
      34              :          * @param ps ParameterSet for GenToBufferTest
      35              :          */
      36            0 :         explicit GenToBufferTest(fhicl::ParameterSet const& ps)
      37            0 :             : generator_ptr_(nullptr)
      38            0 :             , fragment_buffer_ptr_(new FragmentBuffer(ps))
      39            0 :             , request_buffer_ptr_(new RequestBuffer(1))
      40            0 :             , fragment_count_(0)
      41            0 :             , running_(false)
      42              :         {
      43            0 :                 generator_ptr_ = makeCommandableFragmentGenerator(ps.get<std::string>("generator"), ps);
      44            0 :                 generator_ptr_->SetRequestBuffer(request_buffer_ptr_);
      45            0 :                 fragment_buffer_ptr_->SetRequestBuffer(request_buffer_ptr_);
      46            0 :         }
      47              : 
      48              :         /**
      49              :          * @brief Start the test fixture
      50              :          * @param run_number Run Number to use in Start commands
      51              :          */
      52            0 :         void start(int run_number)
      53              :         {
      54            0 :                 metricMan->do_start();
      55            0 :                 request_buffer_ptr_->setRunning(true);
      56            0 :                 generator_ptr_->StartCmd(run_number, 0, 0);
      57              : 
      58            0 :                 running_ = true;
      59            0 :                 boost::thread::attributes attrs;
      60            0 :                 attrs.set_stack_size(4096 * 2000);  // 8 MB
      61            0 :                 receive_thread_.reset(new boost::thread(attrs, boost::bind(&GenToBufferTest::receive_fragments, this)));
      62            0 :                 send_thread_.reset(new boost::thread(attrs, boost::bind(&GenToBufferTest::send_fragments, this)));
      63            0 :         }
      64              :         /**
      65              :          * @brief Stop the test fixture
      66              :          */
      67            0 :         void stop()
      68              :         {
      69            0 :                 generator_ptr_->StopCmd(0, 0);
      70            0 :                 fragment_buffer_ptr_->Stop();
      71            0 :                 request_buffer_ptr_->setRunning(false);
      72              : 
      73            0 :                 running_ = false;
      74            0 :                 if (receive_thread_ && receive_thread_->joinable()) receive_thread_->join();
      75            0 :                 if (send_thread_ && send_thread_->joinable()) send_thread_->join();
      76            0 :                 metricMan->do_stop();
      77            0 :         }
      78              : 
      79              :         /**
      80              :          * @brief Get a handle to the RequestBuffer
      81              :          * @return RequestBuffer shared_ptr handle
      82              :          */
      83            0 :         std::shared_ptr<RequestBuffer> GetRequestBuffer() { return request_buffer_ptr_; }
      84              : 
      85              : private:
      86            0 :         void receive_fragments()
      87              :         {
      88            0 :                 TLOG(TLVL_DEBUG) << "Waiting for first fragment.";
      89            0 :                 artdaq::FragmentPtrs frags;
      90              : 
      91            0 :                 bool active = true;
      92              : 
      93            0 :                 while (active && running_)
      94              :                 {
      95            0 :                         auto loop_start = std::chrono::steady_clock::now();
      96            0 :                         TLOG(18) << "receive_fragments getNext start";
      97            0 :                         active = generator_ptr_->getNext(frags);
      98            0 :                         TLOG(18) << "receive_fragments getNext done (active=" << active << ")";
      99            0 :                         auto after_getnext = std::chrono::steady_clock::now();
     100              :                         // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
     101              :                         // (which indicates that the data flow has stopped) *and* the reason that
     102              :                         // it has stopped is because there was an exception that wasn't handled by
     103              :                         // the experiment-specific FragmentGenerator class, we move to the
     104              :                         // InRunError state so that external observers (e.g. RunControl or
     105              :                         // DAQInterface) can see that there was a problem.
     106            0 :                         if (!active && generator_ptr_ && generator_ptr_->exception())
     107              :                         {
     108            0 :                                 TLOG(TLVL_ERROR) << "Generator has an exception, aborting!";
     109            0 :                                 break;
     110              :                         }
     111              : 
     112            0 :                         if (!active) { break; }
     113              : 
     114            0 :                         if (frags.size() > 0)
     115              :                         {
     116            0 :                                 metricMan->sendMetric("Fragments Generated", frags.size(), "fragments", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Rate | artdaq::MetricMode::Average);
     117              : 
     118            0 :                                 TLOG(18) << "receive_fragments AddFragmentsToBuffer start";
     119            0 :                                 fragment_buffer_ptr_->AddFragmentsToBuffer(std::move(frags));
     120            0 :                                 TLOG(18) << "receive_fragments AddFragmentsToBuffer done";
     121            0 :                                 auto after_addFragsToBuffer = std::chrono::steady_clock::now();
     122            0 :                                 metricMan->sendMetric("FragmentBufferAddTime", artdaq::TimeUtils::GetElapsedTime(after_getnext, after_addFragsToBuffer), "s", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Average | artdaq::MetricMode::Minimum | artdaq::MetricMode::Maximum);
     123              :                         }
     124            0 :                         metricMan->sendMetric("GetNextTime", artdaq::TimeUtils::GetElapsedTime(loop_start, after_getnext), "s", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Average | artdaq::MetricMode::Minimum | artdaq::MetricMode::Maximum);
     125              : 
     126            0 :                         frags.clear();
     127              :                 }
     128            0 :                 TLOG(TLVL_DEBUG) << "receive_fragments loop end";
     129            0 :         }
     130              : 
     131            0 :         void send_fragments()
     132              :         {
     133            0 :                 TLOG(TLVL_DEBUG) << "Waiting for first fragment.";
     134            0 :                 artdaq::FragmentPtrs frags;
     135              : 
     136            0 :                 bool active = true;
     137              : 
     138            0 :                 while (active && running_)
     139              :                 {
     140            0 :                         auto loop_start = std::chrono::steady_clock::now();
     141              : 
     142            0 :                         TLOG(18) << "send_fragments applyRequests start";
     143            0 :                         active = fragment_buffer_ptr_->applyRequests(frags);
     144            0 :                         TLOG(18) << "send_fragments applyRequests done (active=" << active << ")";
     145              : 
     146            0 :                         auto after_requests = std::chrono::steady_clock::now();
     147            0 :                         if (!active) { break; }
     148              : 
     149            0 :                         for (auto& fragPtr : frags)
     150              :                         {
     151            0 :                                 if (!fragPtr.get())
     152              :                                 {
     153            0 :                                         TLOG(TLVL_WARNING) << "Encountered a bad fragment pointer in fragment " << fragment_count_ << ". "
     154            0 :                                                            << "This is most likely caused by a problem with the Fragment Generator!";
     155            0 :                                         continue;
     156            0 :                                 }
     157            0 :                                 if (fragment_count_ == 0)
     158              :                                 {
     159            0 :                                         TLOG(TLVL_DEBUG) << "Received first Fragment from Fragment Generator, sequence ID " << fragPtr->sequenceID() << ", size = " << fragPtr->sizeBytes() << " bytes.";
     160              :                                 }
     161            0 :                                 artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
     162            0 :                                 artdaq::Globals::SetMFIteration("Sequence ID " + std::to_string(sequence_id));
     163              : 
     164            0 :                                 TLOG(17) << "send_fragments seq=" << sequence_id << " sendFragment start";
     165            0 :                                 ++fragment_count_;
     166              : 
     167              :                                 // Turn on lvls (mem and/or slow) 3,13,14 to log every send.
     168            0 :                                 TLOG(((fragment_count_ == 1) ? TLVL_DEBUG
     169              :                                                              : (((fragment_count_ % 250) == 0) ? 13 : 14)))
     170            0 :                                     << ((fragment_count_ == 1)
     171            0 :                                             ? "Sent first Fragment"
     172            0 :                                             : "Sending fragment " + std::to_string(fragment_count_))
     173            0 :                                     << " with SeqID " << sequence_id << ".";
     174              :                         }
     175              : 
     176            0 :                         metricMan->sendMetric("Fragments Discarded", frags.size(), "fragments", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Rate | artdaq::MetricMode::Average);
     177            0 :                         frags.clear();
     178            0 :                         auto after_frag_check = std::chrono::steady_clock::now();
     179            0 :                         metricMan->sendMetric("ApplyRequestsTime", artdaq::TimeUtils::GetElapsedTime(loop_start, after_requests), "s", 3, artdaq::MetricMode::Average | artdaq::MetricMode::Accumulate | artdaq::MetricMode::Maximum | artdaq::MetricMode::Minimum);
     180            0 :                         metricMan->sendMetric("FragmentDiscardTime", artdaq::TimeUtils::GetElapsedTime(after_requests, after_frag_check), "s", 3, artdaq::MetricMode::Average | artdaq::MetricMode::Accumulate);
     181              : 
     182            0 :                         std::this_thread::yield();
     183              :                 }
     184              : 
     185              :                 // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
     186              :                 // processing fragments loop so that metrics correctly go to zero when
     187              :                 // there is no data flowing
     188            0 :                 metricMan->do_stop();
     189              : 
     190            0 :                 TLOG(TLVL_DEBUG) << "send_fragments loop end";
     191            0 :         }
     192              : 
     193              : private:
     194              :         std::unique_ptr<CommandableFragmentGenerator> generator_ptr_;
     195              :         std::unique_ptr<FragmentBuffer> fragment_buffer_ptr_;
     196              :         std::shared_ptr<RequestBuffer> request_buffer_ptr_;
     197              :         std::atomic<size_t> fragment_count_;
     198              :         std::atomic<bool> running_;
     199              :         std::unique_ptr<boost::thread> receive_thread_;
     200              :         std::unique_ptr<boost::thread> send_thread_;
     201              : };
     202              : }  // namespace artdaq
     203              : 
     204            0 : int main(int argc, char* argv[])
     205              : {
     206            0 :         artdaq::configureMessageFacility("RequestSender");
     207              : 
     208              :         struct FragmentReceiverConfig
     209              :         {
     210              :                 fhicl::TableFragment<artdaq::CommandableFragmentGenerator::Config> generatorConfig;
     211              :                 fhicl::TableFragment<artdaq::FragmentBuffer::Config> fragmentBufferConfig;
     212              :         };
     213              : 
     214              :         struct DAQConfig
     215              :         {
     216              :                 fhicl::Table<FragmentReceiverConfig> frConfig{fhicl::Name{"fragment_receiver"}};
     217              :         };
     218              : 
     219              :         struct Config
     220              :         {
     221              :                 fhicl::Table<DAQConfig> daq{fhicl::Name{"daq"}};
     222              :                 fhicl::Table<artdaq::MetricManager::Config> metrics{fhicl::Name{"metrics"}};
     223              :                 fhicl::Atom<double> test_duration_s{fhicl::Name{"test_duration_s"}, fhicl::Comment{"Duration, in seconds, for the test"}, 60.0};
     224              :                 fhicl::Atom<size_t> time_between_requests_us{fhicl::Name{"time_between_requests_us"}, fhicl::Comment{"Amount of time to wait between generated requests, in us"}, 1000};
     225              :                 fhicl::Atom<artdaq::Fragment::timestamp_t> timestamp_increment{fhicl::Name{"timestamp_increment"}, fhicl::Comment{"Amount to increment the timestamp for each request"}, 1};
     226              :                 fhicl::Atom<int> run_number{fhicl::Name{"run_number"}, fhicl::Comment{"Run Number to use for the test"}, 101};
     227              :         };
     228              : 
     229            0 :         auto pset = LoadParameterSet<Config>(argc, argv, "GenToBuffer", "This test application evaluates the rate of Fragment Generation and Request Application.");
     230            0 :         auto fr_pset = pset;
     231              : 
     232            0 :         metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "GenToBuffer");
     233              : 
     234            0 :         if (pset.has_key("daq"))
     235              :         {
     236            0 :                 fr_pset = pset.get<fhicl::ParameterSet>("daq");
     237              :         }
     238              : 
     239            0 :         if (fr_pset.has_key("fragment_receiver"))
     240              :         {
     241            0 :                 fr_pset = fr_pset.get<fhicl::ParameterSet>("fragment_receiver");
     242              :         }
     243            0 :         artdaq::GenToBufferTest gtbt(fr_pset);
     244              : 
     245            0 :         auto buf = gtbt.GetRequestBuffer();
     246              : 
     247            0 :         auto start_time = std::chrono::steady_clock::now();
     248            0 :         auto duration = pset.get<double>("test_duration_s", 60);
     249            0 :         artdaq::Fragment::sequence_id_t seq = 0;
     250            0 :         artdaq::Fragment::timestamp_t timestamp = 1;
     251            0 :         auto time_between_requests_us = pset.get<size_t>("time_between_requests_us", 1000);
     252            0 :         auto timestamp_scale = pset.get<artdaq::Fragment::timestamp_t>("timestamp_increment", 1);
     253              : 
     254            0 :         gtbt.start(pset.get<int>("run_number", 101));
     255              : 
     256            0 :         while (artdaq::TimeUtils::GetElapsedTime(start_time) < duration)
     257              :         {
     258            0 :                 buf->push(++seq, timestamp);
     259            0 :                 timestamp += timestamp_scale;
     260              : 
     261            0 :                 auto us_since_start = artdaq::TimeUtils::GetElapsedTimeMicroseconds(start_time);
     262            0 :                 int64_t time_diff = seq * time_between_requests_us - us_since_start;
     263            0 :                 TLOG(40) << "Time Diff: " << time_diff << ", Time since start: " << us_since_start << ", current epoch: " << seq * time_between_requests_us;
     264            0 :                 if (time_diff > 10)
     265              :                 {
     266            0 :                         usleep(time_diff);
     267              :                 }
     268              :         }
     269              : 
     270            0 :         gtbt.stop();
     271            0 : }
        

Generated by: LCOV version 2.0-1