LCOV - code coverage report
Current view: top level - artdaq/ArtModules/detail - ListenTransferWrapper.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 135 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 15 0

            Line data    Source code
       1              : #include "artdaq/DAQdata/Globals.hh"
       2              : #define TRACE_NAME "ListenTransferWrapper"
       3              : 
       4              : #include "artdaq-core/Data/Fragment.hh"
       5              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       6              : #include "artdaq-core/Utilities/TimeUtils.hh"
       7              : #include "artdaq/ArtModules/detail/ListenTransferWrapper.hh"
       8              : #include "artdaq/DAQdata/NetMonHeader.hh"
       9              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
      10              : 
      11              : #include "cetlib/BasicPluginFactory.h"
      12              : #include "cetlib_except/exception.h"
      13              : #include "fhiclcpp/ParameterSet.h"
      14              : 
      15              : #include <csignal>
      16              : #include <iostream>
      17              : #include <limits>
      18              : #include <memory>
      19              : #include <sstream>
      20              : #include <string>
      21              : 
      22              : namespace {
      23              : volatile std::sig_atomic_t gListenSignalStatus = 0;  ///< Stores singal from signal handler
      24              : }
      25              : 
      26              : /**
      27              :  * \brief Handle a Unix signal
      28              :  * \param signal Signal to handle
      29              :  */
      30            0 : void listen_signal_handler(int signal)
      31              : {
      32            0 :         gListenSignalStatus = signal;
      33            0 : }
      34              : 
      35            0 : artdaq::ListenTransferWrapper::ListenTransferWrapper(const fhicl::ParameterSet& pset)
      36            0 :     : timeoutInUsecs_(pset.get<std::size_t>("timeoutInUsecs", 100000))
      37            0 :     , last_received_data_()
      38            0 :     , last_report_(std::chrono::steady_clock::now())
      39            0 :     , transfer_(nullptr)
      40            0 :     , pset_(pset)
      41            0 :     , maxEventsBeforeInit_(pset.get<std::size_t>("maxEventsBeforeInit", 5))
      42            0 :     , allowedFragmentTypes_(pset.get<std::vector<int>>("allowedFragmentTypes", {226, 227, 229}))
      43            0 :     , runningStateTimeout_(pset.get<double>("dispatcherConnectTimeout", 0))
      44            0 :     , runningStateInterval_us_(pset.get<size_t>("dispatcherConnectRetryInterval_us", 1000000))
      45            0 :     , quitOnFragmentIntegrityProblem_(pset.get<bool>("quitOnFragmentIntegrityProblem", true))
      46            0 :     , multi_run_mode_(pset.get<bool>("allowMultipleRuns", true))
      47              : {
      48            0 :         std::signal(SIGINT, listen_signal_handler);
      49              : 
      50              :         try
      51              :         {
      52            0 :                 if (metricMan)
      53              :                 {
      54            0 :                         metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "Online Monitor");
      55            0 :                         metricMan->do_start();
      56              :                 }
      57              :         }
      58            0 :         catch (...)
      59              :         {
      60            0 :                 artdaq::ExceptionHandler(
      61              :                     artdaq::ExceptionHandlerRethrow::no,
      62              :                     "ListenTransferWrapper: could not configure metrics");
      63            0 :         }
      64              : 
      65              :         try
      66              :         {
      67            0 :                 transfer_ = artdaq::MakeTransferPlugin(pset_, "transfer_plugin", artdaq::TransferInterface::Role::kReceive);
      68              :         }
      69            0 :         catch (...)
      70              :         {
      71            0 :                 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, "ListenTransferWrapper: failure in call to MakeTransferPlugin");
      72            0 :         }
      73              : 
      74              :         // Clamp possible values
      75            0 :         if (runningStateInterval_us_ < 1000)
      76              :         {
      77            0 :                 TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 1000 us";
      78            0 :                 runningStateInterval_us_ = 1000;
      79              :         }
      80            0 :         if (runningStateInterval_us_ > 30000000)
      81              :         {
      82            0 :                 TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 30,000,000 us";
      83            0 :                 runningStateInterval_us_ = 30000000;
      84              :         }
      85            0 : }
      86              : 
      87            0 : artdaq::FragmentPtrs artdaq::ListenTransferWrapper::receiveMessage()
      88              : {
      89            0 :         artdaq::FragmentPtrs fragmentPtrs;
      90            0 :         bool receivedFragment = false;
      91              :         static bool initialized = false;
      92              :         static size_t fragments_received = 0;
      93              : 
      94            0 :         while (gListenSignalStatus == 0)
      95              :         {
      96            0 :                 receivedFragment = false;
      97            0 :                 auto fragmentPtr = std::make_unique<artdaq::Fragment>();
      98              : 
      99            0 :                 while (!receivedFragment)
     100              :                 {
     101            0 :                         if (gListenSignalStatus != 0)
     102              :                         {
     103            0 :                                 TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
     104            0 :                                 return fragmentPtrs;
     105              :                         }
     106              : 
     107              :                         try
     108              :                         {
     109            0 :                                 auto result = transfer_->receiveFragment(*fragmentPtr, timeoutInUsecs_);
     110              : 
     111            0 :                                 if (result >= artdaq::TransferInterface::RECV_SUCCESS)
     112              :                                 {
     113            0 :                                         receivedFragment = true;
     114            0 :                                         fragments_received++;
     115              : 
     116              :                                         static size_t cntr = 0;
     117            0 :                                         auto mod = ++cntr % 10;
     118            0 :                                         auto suffix = "-th";
     119            0 :                                         if (mod == 1)
     120              :                                         {
     121            0 :                                                 suffix = "-st";
     122              :                                         }
     123            0 :                                         if (mod == 2)
     124              :                                         {
     125            0 :                                                 suffix = "-nd";
     126              :                                         }
     127            0 :                                         if (mod == 3)
     128              :                                         {
     129            0 :                                                 suffix = "-rd";
     130              :                                         }
     131            0 :                                         TLOG(TLVL_INFO) << "Received " << cntr << suffix << " event, "
     132            0 :                                                         << "seqID == " << fragmentPtr->sequenceID()
     133            0 :                                                         << ", type == " << fragmentPtr->typeString();
     134            0 :                                         last_received_data_ = std::chrono::steady_clock::now();
     135            0 :                                         continue;
     136            0 :                                 }
     137            0 :                                 if (result == artdaq::TransferInterface::DATA_END)
     138              :                                 {
     139            0 :                                         TLOG(TLVL_ERROR) << "Transfer Plugin disconnected or other unrecoverable error. Shutting down.";
     140            0 :                                         if (multi_run_mode_)
     141              :                                         {
     142            0 :                                                 initialized = false;
     143            0 :                                                 continue;
     144              :                                         }
     145            0 :                                         return fragmentPtrs;
     146              :                                 }
     147              :                                 else
     148              :                                 {
     149            0 :                                         auto tlvl = TLVL_DEBUG + 33;
     150            0 :                                         if (artdaq::TimeUtils::GetElapsedTime(last_report_) > 1.0 && artdaq::TimeUtils::GetElapsedTime(last_received_data_) > 1.0)
     151              :                                         {
     152            0 :                                                 tlvl = TLVL_WARNING;
     153            0 :                                                 last_report_ = std::chrono::steady_clock::now();
     154              :                                         }
     155              : 
     156            0 :                                         auto last_received_milliseconds = artdaq::TimeUtils::GetElapsedTimeMilliseconds(last_received_data_);
     157              : 
     158              :                                         // 02-Jun-2018, KAB: added status/result printout
     159              :                                         // to-do: add another else clause that explicitly checks for RECV_TIMEOUT
     160            0 :                                         TLOG(tlvl) << "Timeout occurred in call to transfer_->receiveFragmentFrom; will try again"
     161            0 :                                                    << ", status = " << result << ", last received data " << last_received_milliseconds << " ms ago.";
     162              :                                 }
     163              :                         }
     164            0 :                         catch (...)
     165              :                         {
     166            0 :                                 artdaq::ExceptionHandler(
     167              :                                     artdaq::ExceptionHandlerRethrow::yes,
     168              :                                     "Problem receiving data in ListenTransferWrapper::receiveMessage");
     169            0 :                         }
     170              :                 }
     171              : 
     172            0 :                 if (fragmentPtr->type() == artdaq::Fragment::EndOfSubrunFragmentType || fragmentPtr->type() == artdaq::Fragment::EndOfRunFragmentType)
     173              :                 {
     174              :                         // Ignore these for now
     175            0 :                         continue;
     176              :                 }
     177              : 
     178            0 :                 if (fragmentPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
     179              :                 {
     180              :                         // if (monitorRegistered_)
     181              :                         //{
     182              :                         //      unregisterMonitor();
     183              :                         // }
     184            0 :                         if (multi_run_mode_)
     185              :                         {
     186              :                                 //      initialized = false;
     187            0 :                                 continue;
     188              :                         }
     189              : 
     190            0 :                         return fragmentPtrs;
     191              :                 }
     192              : 
     193            0 :                 checkIntegrity(*fragmentPtr);
     194              : 
     195            0 :                 if (initialized || fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
     196              :                 {
     197            0 :                         if (initialized && fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
     198              :                         {
     199              :                                 // Ignore reinit for now, maybe handle in ArtdaqInputHelper later
     200            0 :                                 continue;
     201              :                         }
     202            0 :                         initialized = true;
     203            0 :                         fragmentPtrs.push_back(std::move(fragmentPtr));
     204            0 :                         break;
     205              :                 }
     206              : 
     207            0 :                 if (fragments_received > maxEventsBeforeInit_)
     208              :                 {
     209            0 :                         throw cet::exception("ListenTransferWrapper") << "First " << maxEventsBeforeInit_ << " events received did not include the \"Init\" event containing necessary info for art; exiting...";  // NOLINT(cert-err60-cpp)
     210              :                 }
     211            0 :         }
     212              : 
     213            0 :         return fragmentPtrs;
     214            0 : }
     215              : 
     216              : std::shared_ptr<ArtdaqEvent>
     217            0 : artdaq::ListenTransferWrapper::receiveMessages()
     218              : {
     219            0 :         auto output = std::make_shared<ArtdaqEvent>();
     220              : 
     221            0 :         auto ptrs = receiveMessage();
     222            0 :         for (auto& ptr : ptrs)
     223              :         {
     224            0 :                 auto fragType = ptr->type();
     225            0 :                 auto fragPtr = ptr.release();
     226            0 :                 ptr.reset(nullptr);
     227              : 
     228            0 :                 if (output->fragments.count(fragType) == 0u)
     229              :                 {
     230            0 :                         output->fragments[fragType] = std::make_unique<artdaq::Fragments>();
     231              :                 }
     232              : 
     233            0 :                 output->fragments[fragType]->emplace_back(std::move(*fragPtr));
     234              :         }
     235              : 
     236            0 :         return output;
     237            0 : }
     238              : 
     239            0 : void artdaq::ListenTransferWrapper::checkIntegrity(
     240              :     const artdaq::Fragment& fragment) const
     241              : {
     242            0 :         const size_t artdaqheader = artdaq::detail::RawFragmentHeader::num_words() *
     243              :                                     sizeof(artdaq::detail::RawFragmentHeader::RawDataType);
     244            0 :         const auto payload = static_cast<size_t>(fragment.dataEndBytes() - fragment.dataBeginBytes());
     245            0 :         const size_t metadata = sizeof(artdaq::NetMonHeader);
     246            0 :         const size_t totalsize = fragment.sizeBytes();
     247              : 
     248            0 :         const auto type = static_cast<size_t>(fragment.type());
     249              : 
     250            0 :         if (totalsize != artdaqheader + metadata + payload)
     251              :         {
     252            0 :                 std::stringstream errmsg;
     253            0 :                 errmsg << "Error: artdaq fragment of type " << fragment.typeString() << ", sequence ID " << fragment.sequenceID() << " has internally inconsistent measures of its size, signalling data corruption: in bytes,"
     254            0 :                        << " total size = " << totalsize << ", artdaq fragment header = " << artdaqheader << ", metadata = " << metadata << ", payload = " << payload;
     255              : 
     256            0 :                 TLOG(TLVL_ERROR) << errmsg.str();
     257              : 
     258            0 :                 if (quitOnFragmentIntegrityProblem_)
     259              :                 {
     260            0 :                         throw cet::exception("ListenTransferWrapper") << errmsg.str();  // NOLINT(cert-err60-cpp)
     261              :                 }
     262              : 
     263            0 :                 return;
     264            0 :         }
     265              : 
     266            0 :         auto findloc = std::find(allowedFragmentTypes_.begin(), allowedFragmentTypes_.end(), static_cast<int>(type));
     267              : 
     268            0 :         if (findloc == allowedFragmentTypes_.end())
     269              :         {
     270            0 :                 std::stringstream errmsg;
     271            0 :                 errmsg << "Error: artdaq fragment appears to have type "
     272            0 :                        << type << ", not found in the allowed fragment types list";
     273              : 
     274            0 :                 TLOG(TLVL_ERROR) << errmsg.str();
     275            0 :                 if (quitOnFragmentIntegrityProblem_)
     276              :                 {
     277            0 :                         throw cet::exception("ListenTransferWrapper") << errmsg.str();  // NOLINT(cert-err60-cpp)
     278              :                 }
     279              : 
     280            0 :                 return;
     281            0 :         }
     282              : }
     283              : 
     284            0 : artdaq::ListenTransferWrapper::~ListenTransferWrapper()
     285              : {
     286            0 :         artdaq::Globals::CleanUpGlobals();
     287            0 : }
        

Generated by: LCOV version 2.0-1