LCOV - code coverage report
Current view: top level - artdaq/Application - DataReceiverCore.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 158 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 44 0

            Line data    Source code
       1              : #include "TRACE/tracemf.h"
       2              : #include "artdaq/DAQdata/Globals.hh"  // include these 2 first -
       3              : #define TRACE_NAME (app_name + "_DataReceiverCore").c_str()
       4              : 
       5              : #include "artdaq/Application/DataReceiverCore.hh"
       6              : 
       7              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       8              : 
       9              : #include <memory>
      10              : #include <string>
      11              : 
      12            0 : artdaq::DataReceiverCore::DataReceiverCore()
      13            0 :     : stop_requested_(false)
      14            0 :     , pause_requested_(false)
      15            0 :     , run_is_paused_(false)
      16              : 
      17              : {
      18            0 :         TLOG(TLVL_DEBUG + 32) << "Constructor";
      19            0 : }
      20              : 
      21            0 : artdaq::DataReceiverCore::~DataReceiverCore()
      22              : {
      23            0 :         TLOG(TLVL_DEBUG + 32) << "Destructor";
      24            0 : }
      25              : 
      26            0 : bool artdaq::DataReceiverCore::initializeDataReceiver(fhicl::ParameterSet const& pset, fhicl::ParameterSet const& data_pset, fhicl::ParameterSet const& metric_pset)
      27              : {
      28              :         // other parameters
      29            0 :         verbose_ = data_pset.get<bool>("verbose", true);
      30              : 
      31              :         // TRACE here so that mftrace_module and mftrace_iteration are ready by mftrace...should set it for all subsequent traces
      32            0 :         TLOG(TLVL_INFO) << "Initializing Data Receiver";
      33              : 
      34            0 :         if (metric_pset.is_empty())
      35              :         {
      36            0 :                 TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
      37              :         }
      38              :         try
      39              :         {
      40            0 :                 metricMan->initialize(metric_pset, app_name);
      41              :         }
      42            0 :         catch (...)
      43              :         {
      44            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no,
      45              :                                  "Error loading metrics in DataReceiverCore::initialize()");
      46            0 :         }
      47              : 
      48            0 :         fhicl::ParameterSet art_pset = pset;
      49            0 :         if (art_pset.has_key("art"))
      50              :         {
      51            0 :                 art_pset = art_pset.get<fhicl::ParameterSet>("art");
      52              :         }
      53              :         else
      54              :         {
      55            0 :                 art_pset.erase("daq");
      56              :         }
      57              : 
      58              :         // Add the "metrics" block
      59            0 :         auto art_services_pset = art_pset.get<fhicl::ParameterSet>("services");
      60            0 :         auto art_services_ArtdaqSharedMemoryServiceInterface_pset = art_services_pset.get<fhicl::ParameterSet>("ArtdaqSharedMemoryServiceInterface");
      61            0 :         art_services_ArtdaqSharedMemoryServiceInterface_pset.put<fhicl::ParameterSet>("metrics", metric_pset);
      62            0 :         art_services_pset.erase("ArtdaqSharedMemoryServiceInterface");
      63            0 :         art_services_pset.put<fhicl::ParameterSet>("ArtdaqSharedMemoryServiceInterface", art_services_ArtdaqSharedMemoryServiceInterface_pset);
      64            0 :         art_pset.erase("services");
      65            0 :         art_pset.put<fhicl::ParameterSet>("services", art_services_pset);
      66              : 
      67            0 :         fhicl::ParameterSet data_tmp = data_pset;
      68            0 :         if (data_pset.has_key("expected_events_per_bunch"))
      69              :         {
      70            0 :                 data_tmp.put<int>("expected_fragments_per_event", data_pset.get<int>("expected_events_per_bunch"));
      71              :         }
      72              : 
      73            0 :         if (data_pset.has_key("rank"))
      74              :         {
      75            0 :                 if (my_rank >= 0 && data_pset.get<int>("rank") != my_rank)
      76              :                 {
      77            0 :                         TLOG(TLVL_WARNING) << "Rank specified at startup is different than rank specified at configure! Using rank received at configure!";
      78              :                 }
      79            0 :                 my_rank = data_pset.get<int>("rank");
      80              :         }
      81            0 :         if (my_rank == -1)
      82              :         {
      83            0 :                 TLOG(TLVL_ERROR) << "Rank not specified at startup or in configuration! Aborting";
      84            0 :                 exit(1);
      85              :         }
      86              : 
      87            0 :         event_store_ptr_ = std::make_shared<SharedMemoryEventManager>(data_tmp, art_pset);
      88            0 :         art_pset_ = art_pset;
      89            0 :         TLOG(TLVL_DEBUG + 32) << "Resulting art_pset_: \"" << art_pset_.to_string() << "\".";
      90              : 
      91            0 :         receiver_ptr_ = std::make_unique<artdaq::DataReceiverManager>(data_tmp, event_store_ptr_);
      92              : 
      93            0 :         return true;
      94            0 : }
      95              : 
      96            0 : bool artdaq::DataReceiverCore::start(art::RunID id)
      97              : {
      98            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting run " << id.run();
      99              : 
     100              :         // 13-Jul-2018, KAB: added code to update the art_pset inside the event store
     101              :         // with configuration archive information
     102              :         // so that the config info will be stored in the output art/ROOT file.
     103              :         // (Note that we don't bother looping over the config_archive_entries if that
     104              :         // map is empty, but we *do* still update the art configuration with art_pset_
     105              :         // at each begin-run because the config archive may be non-empty one time through
     106              :         // and then empty the next time.)
     107            0 :         fhicl::ParameterSet temp_pset = art_pset_;
     108            0 :         if (!config_archive_entries_.empty())
     109              :         {
     110            0 :                 fhicl::ParameterSet config_pset;
     111            0 :                 for (auto& entry : config_archive_entries_)
     112              :                 {
     113            0 :                         config_pset.put(entry.first, entry.second);
     114              :                 }
     115            0 :                 temp_pset.put_or_replace("configuration_documents", config_pset);
     116            0 :         }
     117            0 :         event_store_ptr_->UpdateArtConfiguration(temp_pset);
     118              : 
     119            0 :         stop_requested_.store(false);
     120            0 :         pause_requested_.store(false);
     121            0 :         run_is_paused_.store(false);
     122            0 :         metricMan->do_start();
     123            0 :         event_store_ptr_->startRun(id.run());
     124            0 :         receiver_ptr_->start_threads();
     125              : 
     126            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Start transition for run " << event_store_ptr_->runID();
     127            0 :         return true;
     128            0 : }
     129              : 
     130            0 : bool artdaq::DataReceiverCore::stop()
     131              : {
     132            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Stopping run " << event_store_ptr_->runID();
     133            0 :         bool endSucceeded = false;
     134              :         int attemptsToEnd;
     135            0 :         receiver_ptr_->stop_threads();
     136              : 
     137              :         // 21-Jun-2013, KAB - the stop_requested_ variable must be set
     138              :         // before the flush lock so that the processFragments loop will
     139              :         // exit (after the timeout), the lock will be released (in the
     140              :         // processFragments method), and this method can continue.
     141            0 :         stop_requested_.store(true);
     142              : 
     143            0 :         TLOG(TLVL_DEBUG + 32) << "Ending run " << event_store_ptr_->runID();
     144            0 :         attemptsToEnd = 1;
     145            0 :         endSucceeded = event_store_ptr_->endRun();
     146            0 :         while (!endSucceeded && attemptsToEnd < 3)
     147              :         {
     148            0 :                 ++attemptsToEnd;
     149            0 :                 TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endRun()";
     150            0 :                 endSucceeded = event_store_ptr_->endRun();
     151              :         }
     152            0 :         if (!endSucceeded)
     153              :         {
     154            0 :                 TLOG(TLVL_ERROR)
     155            0 :                     << "EventStore::endRun in stop method failed after three tries.";
     156              :         }
     157            0 :         TLOG(TLVL_DEBUG + 32) << "Done Ending run " << event_store_ptr_->runID();
     158              : 
     159            0 :         attemptsToEnd = 1;
     160            0 :         TLOG(TLVL_DEBUG + 32) << "stop: Calling EventStore::endOfData";
     161            0 :         endSucceeded = event_store_ptr_->endOfData();
     162            0 :         while (!endSucceeded && attemptsToEnd < 3)
     163              :         {
     164            0 :                 ++attemptsToEnd;
     165            0 :                 TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endOfData()";
     166            0 :                 endSucceeded = event_store_ptr_->endOfData();
     167              :         }
     168              : 
     169            0 :         run_is_paused_.store(false);
     170            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Stop transition for run " << event_store_ptr_->runID();
     171            0 :         return true;
     172              : }
     173              : 
     174            0 : bool artdaq::DataReceiverCore::pause()
     175              : {
     176            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing run " << event_store_ptr_->runID();
     177            0 :         pause_requested_.store(true);
     178            0 :         run_is_paused_.store(true);
     179            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Pause transition for run " << event_store_ptr_->runID();
     180            0 :         return true;
     181              : }
     182              : 
     183            0 : bool artdaq::DataReceiverCore::resume()
     184              : {
     185            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Resuming run " << event_store_ptr_->runID();
     186            0 :         pause_requested_.store(false);
     187            0 :         metricMan->do_start();
     188            0 :         event_store_ptr_->rolloverSubrun(true);
     189            0 :         run_is_paused_.store(false);
     190            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Resume transition for run " << event_store_ptr_->runID();
     191            0 :         return true;
     192              : }
     193              : 
     194            0 : bool artdaq::DataReceiverCore::shutdown()
     195              : {
     196            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting Shutdown transition";
     197              : 
     198              :         /* We don't care about flushing data here.  The only way to transition to the
     199              :            shutdown state is from a state where there is no data taking.  All we have
     200              :            to do is signal the art input module that we're done taking data so that
     201              :            it can wrap up whatever it needs to do. */
     202              : 
     203            0 :         TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down DataReceiverManager";
     204            0 :         receiver_ptr_.reset(nullptr);
     205              : 
     206            0 :         bool endSucceeded = false;
     207            0 :         int attemptsToEnd = 1;
     208            0 :         TLOG(TLVL_DEBUG + 32) << "shutdown: Calling EventStore::endOfData";
     209            0 :         endSucceeded = event_store_ptr_->endOfData();
     210            0 :         while (!endSucceeded && attemptsToEnd < 3)
     211              :         {
     212            0 :                 ++attemptsToEnd;
     213            0 :                 TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endOfData()";
     214            0 :                 endSucceeded = event_store_ptr_->endOfData();
     215              :         }
     216              : 
     217            0 :         TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down SharedMemoryEventManager";
     218            0 :         event_store_ptr_.reset();
     219              : 
     220            0 :         TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down MetricManager";
     221            0 :         metricMan->shutdown();
     222              : 
     223            0 :         TLOG(TLVL_DEBUG + 32) << "shutdown: Complete";
     224            0 :         TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed Shutdown transition";
     225            0 :         return endSucceeded;
     226              : }
     227              : 
     228            0 : bool artdaq::DataReceiverCore::soft_initialize(fhicl::ParameterSet const& pset)
     229              : {
     230            0 :         TLOG(TLVL_DEBUG + 32) << "soft_initialize method called with DAQ "
     231            0 :                               << "ParameterSet = \"" << pset.to_string()
     232            0 :                               << "\".";
     233            0 :         return true;
     234              : }
     235              : 
     236            0 : bool artdaq::DataReceiverCore::reinitialize(fhicl::ParameterSet const& pset)
     237              : {
     238            0 :         TLOG(TLVL_DEBUG + 32) << "reinitialize method called with DAQ "
     239            0 :                               << "ParameterSet = \"" << pset.to_string()
     240            0 :                               << "\".";
     241            0 :         event_store_ptr_ = nullptr;
     242            0 :         return initialize(pset);
     243              : }
     244              : 
     245            0 : bool artdaq::DataReceiverCore::rollover_subrun(uint64_t boundary, uint32_t subrun)
     246              : {
     247            0 :         if (event_store_ptr_)
     248              :         {
     249            0 :                 event_store_ptr_->rolloverSubrun(boundary, subrun, true);
     250            0 :                 return true;
     251              :         }
     252            0 :         return false;
     253              : }
     254              : 
     255            0 : std::string artdaq::DataReceiverCore::report(std::string const& which) const
     256              : {
     257            0 :         if (which == "open_event_count")
     258              :         {
     259            0 :                 if (event_store_ptr_ != nullptr)
     260              :                 {
     261            0 :                         return std::to_string(event_store_ptr_->GetOpenEventCount());
     262              :                 }
     263              : 
     264            0 :                 return "-1";
     265              :         }
     266            0 :         if (which == "event_count")
     267              :         {
     268            0 :                 if (receiver_ptr_ != nullptr)
     269              :                 {
     270            0 :                         return std::to_string(receiver_ptr_->count());
     271              :                 }
     272              : 
     273            0 :                 return "0";
     274              :         }
     275            0 :         else if (which == "stats")
     276              :         {
     277            0 :                 if (event_store_ptr_ != nullptr)
     278            0 :                         return event_store_ptr_->BuildStatisticsString();
     279              :                 else
     280            0 :                         return "-1";
     281              :         }
     282              : 
     283              :         // lots of cool stuff that we can do here
     284              :         // - report on the number of fragments received and the number
     285              :         //   of events built (in the current or previous run
     286              :         // - report on the number of incomplete events in the EventStore
     287              :         //   (if running)
     288            0 :         std::string tmpString;
     289            0 :         if (event_store_ptr_ != nullptr)
     290              :         {
     291            0 :                 tmpString.append(app_name + " run number = " + std::to_string(event_store_ptr_->runID()) + ".\n");
     292              :         }
     293            0 :         tmpString.append("Command \"" + which + "\" is not currently supported.");
     294            0 :         return tmpString;
     295            0 : }
        

Generated by: LCOV version 2.0-1