LCOV - code coverage report
Current view: top level - artdaq/ArtModules/detail - TransferWrapper.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 225 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 35 0

            Line data    Source code
       1              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       2              : #include "artdaq-core/Data/Fragment.hh"
       3              : 
       4              : #define TRACE_NAME "TransferWrapper"
       5              : 
       6              : #include "artdaq/ArtModules/detail/TransferWrapper.hh"
       7              : 
       8              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       9              : #include "artdaq-core/Utilities/TimeUtils.hh"
      10              : #include "artdaq/DAQdata/NetMonHeader.hh"
      11              : #include "artdaq/ExternalComms/MakeCommanderPlugin.hh"
      12              : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
      13              : 
      14              : #include "cetlib_except/exception.h"
      15              : #include "fhiclcpp/ParameterSet.h"
      16              : 
      17              : #include <csignal>
      18              : #include <iostream>
      19              : #include <limits>
      20              : #include <memory>
      21              : #include <sstream>
      22              : #include <string>
      23              : 
      24              : namespace {
      25              : volatile std::sig_atomic_t gSignalStatus = 0;  ///< Stores singal from signal handler
      26              : }
      27              : 
      28              : /**
      29              :  * \brief Handle a Unix signal
      30              :  * \param signal Signal to handle
      31              :  */
      32            0 : void signal_handler(int signal)
      33              : {
      34            0 :         gSignalStatus = signal;
      35            0 : }
      36              : 
      37            0 : artdaq::TransferWrapper::TransferWrapper(const fhicl::ParameterSet& pset)
      38            0 :     : timeoutInUsecs_(pset.get<std::size_t>("timeoutInUsecs", 100000))
      39            0 :     , last_received_data_()
      40            0 :     , last_report_(std::chrono::steady_clock::now())
      41            0 :     , transfer_(nullptr)
      42            0 :     , commander_(nullptr)
      43            0 :     , pset_(pset)
      44            0 :     , dispatcherHost_(pset.get<std::string>("dispatcherHost", "localhost"))
      45            0 :     , dispatcherPort_(pset.get<std::string>("dispatcherPort", "5266"))
      46            0 :     , serverUrl_(pset.get<std::string>("server_url", "http://" + dispatcherHost_ + ":" + dispatcherPort_ + "/RPC2"))
      47            0 :     , maxEventsBeforeInit_(pset.get<std::size_t>("maxEventsBeforeInit", 5))
      48            0 :     , allowedFragmentTypes_(pset.get<std::vector<int>>("allowedFragmentTypes", {static_cast<int>(artdaq::Fragment::DataFragmentType),
      49              :                                                                                 static_cast<int>(artdaq::Fragment::InitFragmentType),
      50              :                                                                                 static_cast<int>(artdaq::Fragment::EndOfRunFragmentType),
      51              :                                                                                 static_cast<int>(artdaq::Fragment::EndOfSubrunFragmentType),
      52              :                                                                                 static_cast<int>(artdaq::Fragment::RunDataFragmentType),
      53              :                                                                                 static_cast<int>(artdaq::Fragment::SubrunDataFragmentType)}))
      54            0 :     , runningStateTimeout_(pset.get<double>("dispatcherConnectTimeout", 0))
      55            0 :     , runningStateInterval_us_(pset.get<size_t>("dispatcherConnectRetryInterval_us", 1000000))
      56            0 :     , quitOnFragmentIntegrityProblem_(pset.get<bool>("quitOnFragmentIntegrityProblem", true))
      57            0 :     , multi_run_mode_(pset.get<bool>("multi_run_mode", false))
      58            0 :     , monitorRegistered_(false)
      59              : {
      60            0 :         std::signal(SIGINT, signal_handler);
      61              : 
      62              :         try
      63              :         {
      64            0 :                 if (metricMan)
      65              :                 {
      66            0 :                         metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "Online Monitor");
      67            0 :                         metricMan->do_start();
      68              :                 }
      69              :         }
      70            0 :         catch (...)
      71              :         {
      72            0 :                 ExceptionHandler(ExceptionHandlerRethrow::no, "TransferWrapper: could not configure metrics");
      73            0 :         }
      74              : 
      75              :         // Clamp possible values
      76            0 :         if (runningStateInterval_us_ < 1000)
      77              :         {
      78            0 :                 TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 1000 us";
      79            0 :                 runningStateInterval_us_ = 1000;
      80              :         }
      81            0 :         if (runningStateInterval_us_ > 30000000)
      82              :         {
      83            0 :                 TLOG(TLVL_WARNING) << "Invalid value " << runningStateInterval_us_ << " us detected for dispatcherConnectRetryInterval_us. Setting to 30,000,000 us";
      84            0 :                 runningStateInterval_us_ = 30000000;
      85              :         }
      86              : 
      87            0 :         fhicl::ParameterSet new_pset(pset);
      88            0 :         if (!new_pset.has_key("server_url"))
      89              :         {
      90            0 :                 new_pset.put<std::string>("server_url", serverUrl_);
      91              :         }
      92              : 
      93            0 :         artdaq::Commandable c;
      94            0 :         commander_ = MakeCommanderPlugin(new_pset, c);
      95            0 : }
      96              : 
      97            0 : artdaq::FragmentPtrs artdaq::TransferWrapper::receiveMessage()
      98              : {
      99            0 :         artdaq::FragmentPtrs fragmentPtrs;
     100            0 :         bool receivedFragment = false;
     101              :         static bool initialized = false;
     102              :         static size_t fragments_received = 0;
     103              : 
     104            0 :         while (gSignalStatus == 0)
     105              :         {
     106            0 :                 receivedFragment = false;
     107            0 :                 auto fragmentPtr = std::make_unique<artdaq::Fragment>();
     108              : 
     109            0 :                 while (!receivedFragment)
     110              :                 {
     111            0 :                         if (gSignalStatus != 0)
     112              :                         {
     113            0 :                                 TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
     114            0 :                                 return fragmentPtrs;
     115              :                         }
     116            0 :                         if (!monitorRegistered_)
     117              :                         {
     118            0 :                                 registerMonitor();
     119            0 :                                 if (!monitorRegistered_)
     120              :                                 {
     121            0 :                                         return fragmentPtrs;
     122              :                                 }
     123              :                         }
     124              : 
     125              :                         try
     126              :                         {
     127            0 :                                 auto result = transfer_->receiveFragment(*fragmentPtr, timeoutInUsecs_);
     128              : 
     129            0 :                                 if (result >= artdaq::TransferInterface::RECV_SUCCESS)
     130              :                                 {
     131            0 :                                         receivedFragment = true;
     132            0 :                                         fragments_received++;
     133              : 
     134              :                                         static size_t cntr = 0;
     135            0 :                                         auto mod = ++cntr % 10;
     136            0 :                                         auto suffix = "-th";
     137            0 :                                         if (mod == 1)
     138              :                                         {
     139            0 :                                                 suffix = "-st";
     140              :                                         }
     141            0 :                                         if (mod == 2)
     142              :                                         {
     143            0 :                                                 suffix = "-nd";
     144              :                                         }
     145            0 :                                         if (mod == 3)
     146              :                                         {
     147            0 :                                                 suffix = "-rd";
     148              :                                         }
     149            0 :                                         TLOG(TLVL_INFO) << "Received " << cntr << suffix << " event, "
     150            0 :                                                         << "seqID == " << fragmentPtr->sequenceID()
     151            0 :                                                         << ", type == " << fragmentPtr->typeString();
     152            0 :                                         last_received_data_ = std::chrono::steady_clock::now();
     153            0 :                                         continue;
     154            0 :                                 }
     155            0 :                                 if (result == artdaq::TransferInterface::DATA_END)
     156              :                                 {
     157            0 :                                         TLOG(TLVL_ERROR) << "Transfer Plugin disconnected or other unrecoverable error. Shutting down.";
     158            0 :                                         return fragmentPtrs;
     159              :                                 }
     160              :                                 else
     161              :                                 {
     162            0 :                                         auto tlvl = TLVL_DEBUG + 33;
     163            0 :                                         if (artdaq::TimeUtils::GetElapsedTime(last_report_) > 1.0 && artdaq::TimeUtils::GetElapsedTime(last_received_data_) > 1.0)
     164              :                                         {
     165            0 :                                                 tlvl = TLVL_WARNING;
     166            0 :                                                 last_report_ = std::chrono::steady_clock::now();
     167              :                                         }
     168              : 
     169            0 :                                         auto last_received_milliseconds = artdaq::TimeUtils::GetElapsedTimeMilliseconds(last_received_data_);
     170              : 
     171              :                                         // 02-Jun-2018, KAB: added status/result printout
     172              :                                         // to-do: add another else clause that explicitly checks for RECV_TIMEOUT
     173            0 :                                         TLOG(tlvl) << "Timeout occurred in call to transfer_->receiveFragmentFrom; will try again"
     174            0 :                                                    << ", status = " << result << ", last received data " << last_received_milliseconds << " ms ago.";
     175              :                                 }
     176              :                         }
     177            0 :                         catch (...)
     178              :                         {
     179            0 :                                 ExceptionHandler(ExceptionHandlerRethrow::yes,
     180              :                                                  "Problem receiving data in TransferWrapper::receiveMessage");
     181            0 :                         }
     182              :                 }
     183              : 
     184            0 :                 if (fragmentPtr->type() == artdaq::Fragment::EndOfDataFragmentType)
     185              :                 {
     186            0 :                         if (!multi_run_mode_)
     187              :                         {
     188            0 :                                 TLOG(TLVL_DEBUG + 32) << "Received shutdown message, returning";
     189            0 :                                 fragmentPtrs.push_back(std::move(fragmentPtr));
     190              :                         }
     191            0 :                         return fragmentPtrs;
     192              :                 }
     193              : 
     194            0 :                 checkIntegrity(*fragmentPtr);
     195              : 
     196            0 :                 if (initialized || fragmentPtr->type() == artdaq::Fragment::InitFragmentType)
     197              :                 {
     198            0 :                         initialized = true;
     199            0 :                         fragmentPtrs.push_back(std::move(fragmentPtr));
     200            0 :                         break;
     201              :                 }
     202              : 
     203            0 :                 if (fragments_received > maxEventsBeforeInit_)
     204              :                 {
     205            0 :                         throw cet::exception("TransferWrapper") << "First " << maxEventsBeforeInit_ << " events received did not include the \"Init\" event containing necessary info for art; exiting...";  // NOLINT(cert-err60-cpp)
     206              :                 }
     207            0 :         }
     208              : 
     209            0 :         return fragmentPtrs;
     210            0 : }
     211              : 
     212            0 : std::shared_ptr<ArtdaqEvent> artdaq::TransferWrapper::receiveMessages()
     213              : {
     214            0 :         std::shared_ptr<ArtdaqEvent> output = std::make_shared<ArtdaqEvent>();
     215              : 
     216            0 :         auto ptrs = receiveMessage();
     217            0 :         for (auto& ptr : ptrs)
     218              :         {
     219            0 :                 auto fragType = ptr->type();
     220            0 :                 auto fragPtr = ptr.release();
     221            0 :                 ptr.reset(nullptr);
     222              : 
     223            0 :                 if (output->fragments.count(fragType) == 0u)
     224              :                 {
     225            0 :                         output->fragments[fragType] = std::make_unique<artdaq::Fragments>();
     226              :                 }
     227              : 
     228            0 :                 output->fragments[fragType]->emplace_back(std::move(*fragPtr));
     229              :         }
     230              : 
     231            0 :         return output;
     232            0 : }
     233              : 
     234            0 : void artdaq::TransferWrapper::checkIntegrity(const artdaq::Fragment& fragment) const
     235              : {
     236            0 :         const size_t artdaqheader = artdaq::detail::RawFragmentHeader::num_words() *
     237              :                                     sizeof(artdaq::detail::RawFragmentHeader::RawDataType);
     238            0 :         const auto payload = static_cast<size_t>(fragment.dataEndBytes() - fragment.dataBeginBytes());
     239            0 :         const size_t metadata = sizeof(artdaq::NetMonHeader);
     240            0 :         const size_t totalsize = fragment.sizeBytes();
     241              : 
     242            0 :         const auto type = static_cast<size_t>(fragment.type());
     243              : 
     244            0 :         if (totalsize != artdaqheader + metadata + payload)
     245              :         {
     246            0 :                 std::stringstream errmsg;
     247            0 :                 errmsg << "Error: artdaq fragment of type " << fragment.typeString() << ", sequence ID " << fragment.sequenceID() << " has internally inconsistent measures of its size, signalling data corruption: in bytes,"
     248            0 :                        << " total size = " << totalsize << ", artdaq fragment header = " << artdaqheader << ", metadata = " << metadata << ", payload = " << payload;
     249              : 
     250            0 :                 TLOG(TLVL_ERROR) << errmsg.str();
     251              : 
     252            0 :                 if (quitOnFragmentIntegrityProblem_)
     253              :                 {
     254            0 :                         throw cet::exception("TransferWrapper") << errmsg.str();  // NOLINT(cert-err60-cpp)
     255              :                 }
     256              : 
     257            0 :                 return;
     258            0 :         }
     259              : 
     260            0 :         auto findloc = std::find(allowedFragmentTypes_.begin(), allowedFragmentTypes_.end(), static_cast<int>(type));
     261              : 
     262            0 :         if (findloc == allowedFragmentTypes_.end())
     263              :         {
     264            0 :                 std::stringstream errmsg;
     265            0 :                 errmsg << "Error: artdaq fragment appears to have type "
     266            0 :                        << type << ", not found in the allowed fragment types list";
     267              : 
     268            0 :                 TLOG(TLVL_ERROR) << errmsg.str();
     269            0 :                 if (quitOnFragmentIntegrityProblem_)
     270              :                 {
     271            0 :                         throw cet::exception("TransferWrapper") << errmsg.str();  // NOLINT(cert-err60-cpp)
     272              :                 }
     273              : 
     274            0 :                 return;
     275            0 :         }
     276              : }
     277              : 
     278            0 : void artdaq::TransferWrapper::registerMonitor()
     279              : {
     280              :         try
     281              :         {
     282            0 :                 transfer_.reset(nullptr);
     283            0 :                 transfer_ = MakeTransferPlugin(pset_, "transfer_plugin", TransferInterface::Role::kReceive);
     284              :         }
     285            0 :         catch (...)
     286              :         {
     287            0 :                 ExceptionHandler(ExceptionHandlerRethrow::yes,
     288              :                                  "TransferWrapper: failure in call to MakeTransferPlugin");
     289            0 :         }
     290              : 
     291            0 :         auto start = std::chrono::steady_clock::now();
     292            0 :         auto sts = getDispatcherStatus();
     293            0 :         while (sts != "Running" && (runningStateTimeout_ == 0 || TimeUtils::GetElapsedTime(start) < runningStateTimeout_))
     294              :         {
     295            0 :                 TLOG(TLVL_DEBUG + 32) << "Dispatcher state: " << sts;
     296            0 :                 if (gSignalStatus != 0)
     297              :                 {
     298            0 :                         TLOG(TLVL_INFO) << "Ctrl-C appears to have been hit";
     299            0 :                         return;
     300              :                 }
     301            0 :                 TLOG(TLVL_INFO) << "Waited " << std::fixed << std::setprecision(2) << TimeUtils::GetElapsedTime(start) << " s / " << runningStateTimeout_ << " s for Dispatcher to enter the Running state (state=" << sts << ")";
     302            0 :                 usleep(runningStateInterval_us_);
     303            0 :                 sts = getDispatcherStatus();
     304              :         }
     305            0 :         if (sts != "Running")
     306              :         {
     307            0 :                 return;
     308              :         }
     309              : 
     310            0 :         auto dispatcherConfig = pset_.get<fhicl::ParameterSet>("dispatcher_config");
     311              : 
     312            0 :         int retry = 3;
     313              : 
     314            0 :         while (retry > 0)
     315              :         {
     316            0 :                 label_ = dispatcherConfig.get<std::string>("unique_label") + "T" + std::to_string(time(0));
     317            0 :                 dispatcherConfig.erase("unique_label");
     318            0 :                 dispatcherConfig.put<std::string>("unique_label", label_);
     319            0 :                 TLOG(TLVL_INFO) << "Attempting to register this monitor (\"" << label_
     320            0 :                                 << "\") with the dispatcher aggregator";
     321              : 
     322            0 :                 auto status = commander_->send_register_monitor(dispatcherConfig.to_string());
     323              : 
     324            0 :                 TLOG(TLVL_INFO) << "Response from dispatcher is \"" << status << "\"";
     325              : 
     326            0 :                 if (status == "Success")
     327              :                 {
     328            0 :                         monitorRegistered_ = true;
     329            0 :                         break;
     330              :                 }
     331              : 
     332            0 :                 TLOG(TLVL_WARNING) << "Error in TransferWrapper: attempt to register with dispatcher did not result in the \"Success\" response";
     333            0 :                 usleep(100000);
     334              : 
     335            0 :                 retry--;
     336            0 :         }
     337            0 : }
     338              : 
     339            0 : void artdaq::TransferWrapper::unregisterMonitor()
     340              : {
     341            0 :         if (!monitorRegistered_)
     342              :         {
     343            0 :                 TLOG(TLVL_WARNING) << "The function to unregister the monitor was called, but the monitor doesn't appear to be registered";
     344            0 :                 return;
     345              :         }
     346              : 
     347            0 :         auto start_time = std::chrono::steady_clock::now();
     348            0 :         bool waiting = true;
     349            0 :         while (artdaq::TimeUtils::GetElapsedTime(start_time) < 5.0 && waiting)
     350              :         {
     351            0 :                 std::string sts = getDispatcherStatus();
     352              : 
     353            0 :                 if (sts.empty())
     354            0 :                         return;
     355              : 
     356            0 :                 if (sts == "busy")
     357              :                 {
     358            0 :                         TLOG(TLVL_INFO) << "The Dispatcher returned \"busy\", will wait 0.5s and retry";
     359            0 :                         usleep(500000);
     360            0 :                         continue;
     361            0 :                 }
     362              : 
     363            0 :                 if (sts != "Running" && sts != "Ready")
     364              :                 {
     365            0 :                         TLOG(TLVL_WARNING) << "The Dispatcher is not in the Running or Ready state, will not attempt to unregister (state: " << sts << ")";
     366            0 :                         return;
     367              :                 }
     368            0 :                 waiting = false;
     369            0 :         }
     370            0 :         if (waiting)
     371              :         {
     372            0 :                 TLOG(TLVL_WARNING) << "A timeout occurred waiting for the Dispatcher to leave the \"busy\" state, will not attempt to unregister";
     373            0 :                 return;
     374              :         }
     375              : 
     376            0 :         int retry = 3;
     377            0 :         while (retry > 0)
     378              :         {
     379            0 :                 TLOG(TLVL_INFO) << "Requesting that this monitor (" << label_
     380            0 :                                 << ") be unregistered from the dispatcher aggregator";
     381              : 
     382            0 :                 auto status = commander_->send_unregister_monitor(label_);
     383              : 
     384            0 :                 TLOG(TLVL_INFO) << "Response from dispatcher is \"" << status << "\"";
     385              : 
     386            0 :                 if (status == "Success")
     387              :                 {
     388            0 :                         break;
     389              :                 }
     390            0 :                 else if (status == "busy")
     391              :                 {
     392            0 :                         TLOG(TLVL_DEBUG + 32) << "The Dispatcher returned \"busy\", will retry in 0.5s";
     393              :                 }
     394              :                 else
     395              :                 {
     396            0 :                         TLOG(TLVL_WARNING) << "The Dispatcher returned status " << status << " when attempting to unregister this monitor!";
     397              :                         // throw cet::exception("TransferWrapper") << "Error in TransferWrapper: attempt to unregister with dispatcher did not result in the \"Success\" response";
     398              :                 }
     399            0 :                 retry--;
     400            0 :                 usleep(500000);
     401            0 :         }
     402              : 
     403            0 :         TLOG(TLVL_INFO) << "Successfully unregistered the monitor from the Dispatcher";
     404            0 :         monitorRegistered_ = false;
     405              : }
     406              : 
     407            0 : std::string artdaq::TransferWrapper::getDispatcherStatus()
     408              : {
     409              :         try
     410              :         {
     411            0 :                 return commander_->send_status();
     412              :         }
     413            0 :         catch (std::exception const& ex)
     414              :         {
     415            0 :                 TLOG(TLVL_WARNING) << "An exception was thrown trying to collect the Dispatcher's status. Most likely cause is the application is no longer running.";
     416            0 :                 return "";
     417            0 :         }
     418              : }
     419              : 
     420            0 : artdaq::TransferWrapper::~TransferWrapper()
     421              : {
     422            0 :         if (monitorRegistered_)
     423              :         {
     424              :                 try
     425              :                 {
     426            0 :                         unregisterMonitor();
     427              :                 }
     428            0 :                 catch (...)
     429              :                 {
     430            0 :                         ExceptionHandler(ExceptionHandlerRethrow::no,
     431              :                                          "An exception occurred when trying to unregister monitor during TransferWrapper's destruction");
     432            0 :                 }
     433              :         }
     434            0 :         artdaq::Globals::CleanUpGlobals();
     435            0 : }
        

Generated by: LCOV version 2.0-1