LCOV - code coverage report
Current view: top level - artdaq/ArtModules - ArtdaqSharedMemoryService_service.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 196 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 53 0

            Line data    Source code
       1              : #include "TRACE/tracemf.h"
       2              : 
       3              : #include "artdaq/ArtModules/ArtdaqSharedMemoryServiceInterface.h"
       4              : #include "artdaq/DAQdata/Globals.hh"
       5              : 
       6              : #include "artdaq-core/Core/SharedMemoryEventReceiver.hh"
       7              : #include "artdaq-core/Utilities/ExceptionHandler.hh"
       8              : 
       9              : #include "art/Framework/Services/Registry/ServiceDefinitionMacros.h"
      10              : #include "art/Framework/Services/Registry/ServiceHandle.h"
      11              : #include "fhiclcpp/types/Atom.h"
      12              : #include "fhiclcpp/types/Comment.h"
      13              : #include "fhiclcpp/types/ConfigurationTable.h"
      14              : #include "fhiclcpp/types/Name.h"
      15              : 
      16              : #include <cstdint>
      17              : #include <memory>
      18              : 
      19              : #define TRACE_NAME "ArtdaqSharedMemoryService"
      20              : // ----------------------------------------------------------------------
      21              : 
      22              : /**
      23              :  * \brief ArtdaqSharedMemoryService extends ArtdaqSharedMemoryServiceInterface.
      24              :  * It receives events from shared memory using SharedMemoryEventReceiver. It also manages the artdaq Global varaibles my_rank and app_name.
      25              :  * Users should retrieve a ServiceHandle to this class before using artdaq Globals to ensure the correct values are used.
      26              :  */
      27              : class ArtdaqSharedMemoryService : public ArtdaqSharedMemoryServiceInterface
      28              : {
      29              : public:
      30              :         /// <summary>
      31              :         /// Allowed Configuration parameters of NetMonTransportService. May be used for configuration validation
      32              :         /// </summary>
      33              :         struct Config
      34              :         {
      35              :                 /// "shared_memory_key" (Default: 0xBEE70000 + pid): Key to use when connecting to shared memory. Will default to 0xBEE70000 + getppid().
      36              :                 fhicl::Atom<uint32_t> shared_memory_key{fhicl::Name{"shared_memory_key"}, fhicl::Comment{"Key to use when connecting to shared memory. Will default to 0xBEE70000 + getppid()."}, 0xBEE70000};
      37              :                 /// "shared_memory_key" (Default: 0xCEE70000 + pid): Key to use when connecting to broadcast shared memory. Will default to 0xCEE70000 + getppid().
      38              :                 fhicl::Atom<uint32_t> broadcast_shared_memory_key{fhicl::Name{"broadcast_shared_memory_key"}, fhicl::Comment{"Key to use when connecting to broadcast shared memory. Will default to 0xCEE70000 + getppid()."}, 0xCEE70000};
      39              :                 /// "rank" (OPTIONAL) : The rank of this applicaiton, for use by non - artdaq applications running NetMonTransportService
      40              :                 fhicl::Atom<int> rank{fhicl::Name{"rank"}, fhicl::Comment{"Rank of this artdaq application. Used for data transfers"}};
      41              :                 /// "subrun_closure_threshold" (Default: 5) Minimum number of events in event ordering list before releasing a subrun/run change event
      42              :                 fhicl::Atom<size_t> subrun_closure_threshold{fhicl::Name{"subrun_closure_threshold"}, fhicl::Comment{"Minimum number of events in event ordering list before releasing a subrun/run change event"}, 1};
      43              :                 /// "safety_valve_timeout_s" (Default: 10.0): Maximum time (in s) to wait before releasing the front of the event ordering list
      44              :                 fhicl::Atom<double> safety_valve_timeout_s{fhicl::Name{"safety_valve_timeout_s"}, fhicl::Comment{"Maximum time (in s) to wait before releasing the front of the event ordering list"}, 10.0};
      45              :         };
      46              :         /// Used for ParameterSet validation (if desired)
      47              :         using Parameters = fhicl::WrappedTable<Config>;
      48              : 
      49              :         /**
      50              :          * \brief NetMonTransportService Destructor. Calls disconnect().
      51              :          */
      52              :         virtual ~ArtdaqSharedMemoryService();
      53              : 
      54              :         /**
      55              :          * \brief NetMonTransportService Constructor
      56              :          * \param pset ParameterSet used to configure NetMonTransportService and DataSenderManager. See NetMonTransportService::Config
      57              :          */
      58              :         ArtdaqSharedMemoryService(fhicl::ParameterSet const& pset, art::ActivityRegistry&);
      59              : 
      60              :         /**
      61              :          * \brief Receive event(s) from the shared memory
      62              :          * \param broadcast Whether to only attempt to receive a broadcast (broadcasts are always preferentially received over data)
      63              :          * \return Map of Fragment types retrieved from shared memory
      64              :          */
      65              :         std::shared_ptr<ArtdaqEvent> ReceiveEvent(bool broadcast) override;
      66              : 
      67              :         /**
      68              :          * \brief Get the number of events which are ready to be read
      69              :          * \return The number of events which can be read
      70              :          */
      71            0 :         size_t GetQueueSize() override { return incoming_events_->ReadReadyCount(); }
      72              : 
      73              :         /**
      74              :          * \brief Get the maximum number of events which can be stored in the shared memory
      75              :          * \return The maximum number of events which can be stored in the shared memory
      76              :          */
      77            0 :         size_t GetQueueCapacity() override { return incoming_events_->size(); }
      78              : 
      79              :         /**
      80              :          * \brief Get the ID of this art process
      81              :          * \return The ID of this art process from the shared memory segment
      82              :          */
      83            0 :         size_t GetMyId() override { return incoming_events_->GetMyId(); }
      84              : 
      85              : private:
      86              :         ArtdaqSharedMemoryService(ArtdaqSharedMemoryService const&) = delete;
      87              :         ArtdaqSharedMemoryService(ArtdaqSharedMemoryService&&) = delete;
      88              :         ArtdaqSharedMemoryService& operator=(ArtdaqSharedMemoryService const&) = delete;
      89              :         ArtdaqSharedMemoryService& operator=(ArtdaqSharedMemoryService&&) = delete;
      90              : 
      91              :         std::shared_ptr<ArtdaqEvent> ReadEventFromSharedMemory(bool broadcast);
      92              : 
      93              : private:
      94              :         std::unique_ptr<artdaq::SharedMemoryEventReceiver> incoming_events_;
      95              :         std::list<std::shared_ptr<ArtdaqEvent>> event_ordering_;
      96              :         size_t read_timeout_;
      97              :         size_t subrun_closure_threshold_{1};
      98              :         double safety_valve_timeout_s_{10.0};
      99              :         bool last_read_timeout_{false};
     100              :         bool resume_after_timeout_;
     101              :         bool printed_exit_message_{false};
     102              :         bool end_of_data_received_{false};
     103              :         bool subrun_has_events_{false};
     104              :         uint32_t current_subrun_{0};
     105              : };
     106              : 
     107            0 : DECLARE_ART_SERVICE_INTERFACE_IMPL(ArtdaqSharedMemoryService, ArtdaqSharedMemoryServiceInterface, LEGACY)
     108              : 
     109              : static fhicl::ParameterSet empty_pset;
     110              : 
     111              : // clang-format off
     112              : #define TLVL_CONSTRUCTOR    TLVL_DEBUG + 32
     113              : #define TLVL_READEVENT      TLVL_DEBUG + 33
     114              : #define TLVL_READEVENT_2    TLVL_DEBUG + 34
     115              : #define TLVL_READEVENT_3    TLVL_DEBUG + 35
     116              : #define TLVL_RECEIVEEVENT   TLVL_DEBUG + 36
     117              : #define TLVL_RECEIVEEVENT_2 TLVL_DEBUG + 37
     118              : #define TLVL_RECEIVEEVENT_3 TLVL_DEBUG + 38
     119              : #define TLVL_RECEIVEEVENT_4 TLVL_DEBUG + 39
     120              : // clang-format on
     121              : 
     122            0 : ArtdaqSharedMemoryService::ArtdaqSharedMemoryService(fhicl::ParameterSet const& pset, art::ActivityRegistry& /*unused*/)
     123            0 :     : incoming_events_(nullptr)
     124            0 :     , event_ordering_()
     125            0 :     , read_timeout_(pset.get<size_t>("read_timeout_us", static_cast<size_t>(pset.get<double>("waiting_time", 600.0) * 1000000)))
     126            0 :     , subrun_closure_threshold_(pset.get<size_t>("subrun_closure_threshold", artdaq::SharedMemoryManager::GetCatchUpFactor() + 2))  // +2, one for ESRF itself, one for extra padding to ensure that no catch-up is being performed
     127            0 :     , safety_valve_timeout_s_(pset.get<double>("safety_valve_timeout_s", 10.0))
     128            0 :     , resume_after_timeout_(pset.get<bool>("resume_after_timeout", true))
     129              : {
     130            0 :         TLOG(TLVL_CONSTRUCTOR) << "ArtdaqSharedMemoryService CONSTRUCTOR";
     131              : 
     132            0 :         incoming_events_ = std::make_unique<artdaq::SharedMemoryEventReceiver>(
     133            0 :             pset.get<int>("shared_memory_key", artdaq::Globals::SharedMemoryKey(0xEE000000, true)),
     134            0 :             pset.get<int>("broadcast_shared_memory_key", artdaq::Globals::SharedMemoryKey(0xBB000000, true)));
     135              : 
     136            0 :         char const* artapp_env = getenv("ARTDAQ_APPLICATION_NAME");
     137            0 :         std::string artapp_str;
     138            0 :         if (artapp_env != nullptr)
     139              :         {
     140            0 :                 artapp_str = std::string(artapp_env) + "_";
     141              :         }
     142              : 
     143            0 :         TLOG(TLVL_CONSTRUCTOR) << "Setting app_name";
     144            0 :         artdaq::Globals::my_art_id_ = incoming_events_->GetMyId();
     145            0 :         app_name = artapp_str + "art" + std::to_string(artdaq::Globals::my_art_id_);
     146              :         // artdaq::configureMessageFacility(app_name.c_str()); // ELF 11/20/2020: MessageFacility already configured by initialization pset
     147              : 
     148            0 :         artapp_env = getenv("ARTDAQ_RANK");
     149            0 :         if (artapp_env != nullptr && my_rank < 0)
     150              :         {
     151            0 :                 TLOG(TLVL_CONSTRUCTOR) << "Setting rank from envrionment";
     152            0 :                 my_rank = strtol(artapp_env, nullptr, 10);
     153            0 :         }
     154              :         else
     155              :         {
     156            0 :                 TLOG(TLVL_CONSTRUCTOR) << "Setting my_rank from shared memory";
     157            0 :                 my_rank = incoming_events_->GetRank();
     158              :         }
     159              : 
     160              :         try
     161              :         {
     162            0 :                 if (metricMan)
     163              :                 {
     164            0 :                         metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), app_name);
     165            0 :                         metricMan->do_start();
     166              :                 }
     167              :         }
     168            0 :         catch (...)
     169              :         {
     170            0 :                 artdaq::ExceptionHandler(artdaq::ExceptionHandlerRethrow::no, "Error loading metrics in ArtdaqSharedMemoryService()");
     171            0 :         }
     172              : 
     173            0 :         TLOG(TLVL_INFO) << "app_name is " << app_name << ", rank " << my_rank;
     174            0 : }
     175              : 
     176            0 : ArtdaqSharedMemoryService::~ArtdaqSharedMemoryService()
     177              : {
     178            0 :         artdaq::Globals::CleanUpGlobals();
     179            0 : }
     180              : 
     181            0 : std::shared_ptr<ArtdaqEvent> ArtdaqSharedMemoryService::ReadEventFromSharedMemory(bool broadcast)
     182              : {
     183            0 :         TLOG(TLVL_READEVENT) << "ReadEventFromSharedMemory BEGIN";
     184            0 :         std::shared_ptr<ArtdaqEvent> output_event;
     185              : 
     186            0 :         while (output_event == nullptr)
     187              :         {
     188            0 :                 TLOG(TLVL_READEVENT_2) << "ReadEventFromSharedMemory: Waiting for available buffer";
     189            0 :                 bool got_event = false;
     190            0 :                 auto start_time = std::chrono::steady_clock::now();
     191            0 :                 auto read_timeout_to_use = read_timeout_ > 100000 ? 100000 : read_timeout_;
     192            0 :                 if (!resume_after_timeout_ || broadcast) read_timeout_to_use = read_timeout_;
     193            0 :                 while (!incoming_events_->IsEndOfData() && !got_event)
     194              :                 {
     195            0 :                         got_event = incoming_events_->ReadyForRead(broadcast, read_timeout_to_use);
     196            0 :                         if (!got_event && (!resume_after_timeout_ || broadcast))  // Only try broadcasts once!
     197              :                         {
     198            0 :                                 TLOG(TLVL_ERROR) << "Timeout occurred! No data received after " << read_timeout_to_use << " us. Returning empty Fragment list!";
     199            0 :                                 last_read_timeout_ = true;
     200            0 :                                 return nullptr;
     201              :                         }
     202            0 :                         if (!got_event && artdaq::TimeUtils::GetElapsedTimeMicroseconds(start_time) > read_timeout_)
     203              :                         {
     204            0 :                                 TLOG(TLVL_READEVENT_2) << "Timeout occurred! No data received after " << artdaq::TimeUtils::GetElapsedTimeMicroseconds(start_time) << " us. Retrying.";
     205            0 :                                 last_read_timeout_ = true;
     206              :                         }
     207              :                 }
     208              : 
     209            0 :                 if (incoming_events_->IsEndOfData())
     210              :                 {
     211            0 :                         if (!printed_exit_message_)
     212              :                         {
     213            0 :                                 TLOG(TLVL_INFO) << "End of Data signal received, exiting";
     214            0 :                                 printed_exit_message_ = true;
     215              :                         }
     216            0 :                         return nullptr;
     217              :                 }
     218              : 
     219            0 :                 TLOG(TLVL_READEVENT) << "ReadEventFromSharedMemory: Reading buffer header";
     220            0 :                 last_read_timeout_ = false;
     221            0 :                 output_event = std::make_shared<ArtdaqEvent>();
     222            0 :                 auto errflag = false;
     223            0 :                 auto hdrPtr = incoming_events_->ReadHeader(errflag);
     224            0 :                 if (errflag || hdrPtr == nullptr)
     225              :                 {  // Buffer was changed out from under reader!
     226            0 :                         incoming_events_->ReleaseBuffer();
     227            0 :                         continue;  // retry
     228              :                 }
     229            0 :                 output_event->header = std::make_shared<artdaq::detail::RawEventHeader>(*hdrPtr);
     230            0 :                 TLOG(TLVL_READEVENT) << "ReadEventFromSharedMemory: Getting Fragment types";
     231            0 :                 auto fragmentTypes = incoming_events_->GetFragmentTypes(errflag);
     232            0 :                 if (errflag)
     233              :                 {  // Buffer was changed out from under reader!
     234            0 :                         incoming_events_->ReleaseBuffer();
     235            0 :                         continue;  // retry
     236              :                 }
     237            0 :                 if (fragmentTypes.empty())
     238              :                 {
     239            0 :                         TLOG(TLVL_ERROR) << "Event has no Fragments! Aborting!";
     240            0 :                         incoming_events_->ReleaseBuffer();
     241            0 :                         return nullptr;
     242              :                 }
     243              : 
     244            0 :                 for (auto const& type : fragmentTypes)
     245              :                 {
     246            0 :                         TLOG(TLVL_READEVENT_3) << "ReadEventFromSharedMemory: Getting all Fragments of type " << static_cast<int>(type);
     247            0 :                         output_event->fragments[type] = incoming_events_->GetFragmentsByType(errflag, type);
     248            0 :                         if (!output_event->fragments[type])
     249              :                         {
     250            0 :                                 TLOG(TLVL_WARNING) << "Error retrieving Fragments from shared memory! (Most likely due to a buffer overwrite) Retrying...";
     251            0 :                                 incoming_events_->ReleaseBuffer();
     252            0 :                                 output_event->fragments.clear();
     253            0 :                                 continue;
     254            0 :                         }
     255              :                         // Events coming out of the EventStore are not sorted but need to be sorted by sequence ID before they can be passed to art.
     256            0 :                         std::sort(output_event->fragments[type]->begin(), output_event->fragments[type]->end(), artdaq::fragmentSequenceIDCompare);
     257              :                 }
     258            0 :                 TLOG(TLVL_READEVENT) << "ReadEventFromSharedMemory: Releasing buffer";
     259            0 :                 incoming_events_->ReleaseBuffer();
     260            0 :         }
     261              : 
     262            0 :         TLOG(TLVL_READEVENT) << "ReadEventFromSharedMemory END";
     263            0 :         return output_event;
     264            0 : }
     265              : 
     266            0 : std::shared_ptr<ArtdaqEvent> ArtdaqSharedMemoryService::ReceiveEvent(bool broadcast)
     267              : {
     268            0 :         TLOG(TLVL_RECEIVEEVENT) << "ReceiveEvent BEGIN";
     269            0 :         std::shared_ptr<ArtdaqEvent> output_event;
     270            0 :         auto start_time = std::chrono::steady_clock::now();
     271              : 
     272            0 :         while (output_event == nullptr)
     273              :         {
     274              :                 // If we experienced a timeout, or have an EndOfData event, drain any held Start/End Run/SubRun events
     275            0 :                 if (last_read_timeout_ || end_of_data_received_)
     276              :                 {
     277            0 :                         if (event_ordering_.size() > 0)
     278              :                         {
     279            0 :                                 output_event = event_ordering_.front();
     280            0 :                                 event_ordering_.pop_front();
     281            0 :                                 break;  // while(output_event == nullptr)
     282              :                         }
     283              :                         // Don't try to get more data if we have an EndOfData event
     284            0 :                         if (end_of_data_received_) { break; }
     285              :                 }
     286              : 
     287            0 :                 if (event_ordering_.size() > 0)
     288              :                 {
     289            0 :                         auto first_type = event_ordering_.front()->FirstFragmentType();
     290            0 :                         auto first_sr = event_ordering_.front()->header->subrun_id;
     291              :                         // If there is an Init Fragment, return it
     292            0 :                         if (first_type == artdaq::Fragment::InitFragmentType)
     293              :                         {
     294            0 :                                 TLOG(TLVL_RECEIVEEVENT) << "Returning Init Fragment";
     295            0 :                                 output_event = event_ordering_.front();
     296            0 :                                 event_ordering_.pop_front();
     297            0 :                                 break;  // while(output_event == nullptr)
     298              :                         }
     299            0 :                         if (current_subrun_ != 0 && first_sr <= current_subrun_)
     300              :                         {
     301            0 :                                 if (artdaq::Fragment::isUserFragmentType(first_type) || first_type == artdaq::Fragment::DataFragmentType)
     302              :                                 {
     303            0 :                                         TLOG(TLVL_RECEIVEEVENT) << "Returning Fragment due to subrun match";
     304            0 :                                         output_event = event_ordering_.front();
     305            0 :                                         event_ordering_.pop_front();
     306            0 :                                         break;  // while(output_event == nullptr)
     307              :                                 }
     308            0 :                                 else if (event_ordering_.size() > subrun_closure_threshold_)
     309              :                                 {
     310              :                                         // First Fragment is broadcast (begin/end run/subrun), but there's more in event ordering!
     311            0 :                                         TLOG(TLVL_RECEIVEEVENT) << "Returning Broadcast Fragment due to subrun closure";
     312            0 :                                         output_event = event_ordering_.front();
     313            0 :                                         event_ordering_.pop_front();
     314            0 :                                         break;  // while(output_event == nullptr)
     315              :                                 }
     316            0 :                         }
     317            0 :                         else if (current_subrun_ != 0 && first_sr > current_subrun_ + 1)
     318              :                         {
     319            0 :                                 TLOG(TLVL_RECEIVEEVENT) << "Returning Fragment due to stale current_subrun_ (first_sr=" << first_sr << ", current_subrun_=" << current_subrun_ << ")";
     320            0 :                                 output_event = event_ordering_.front();
     321            0 :                                 event_ordering_.pop_front();
     322            0 :                                 break;  // while(output_event == nullptr)
     323              :                         }
     324              : 
     325            0 :                         if (event_ordering_.size() == 1 && (first_type == artdaq::Fragment::EndOfRunFragmentType || first_type == artdaq::Fragment::RunDataFragmentType))
     326              :                         {
     327            0 :                                 TLOG(TLVL_RECEIVEEVENT) << "Returning Broadcast Fragment due to end-of-run";
     328            0 :                                 output_event = event_ordering_.front();
     329            0 :                                 event_ordering_.pop_front();
     330            0 :                                 break;  // while(output_event == nullptr)
     331              :                         }
     332              : 
     333            0 :                         if (current_subrun_ == 0)
     334              :                         {
     335            0 :                                 if (artdaq::Fragment::isUserFragmentType(first_type) || first_type == artdaq::Fragment::DataFragmentType)
     336              :                                 {
     337            0 :                                         TLOG(TLVL_RECEIVEEVENT) << "Returning Fragment due to unset subrun";
     338            0 :                                         output_event = event_ordering_.front();
     339            0 :                                         event_ordering_.pop_front();
     340            0 :                                         break;  // while(output_event == nullptr)
     341              :                                 }
     342              :                                 // We cannot close a subrun that has not yet been opened
     343            0 :                                 if (first_type == artdaq::Fragment::EndOfSubrunFragmentType || first_type == artdaq::Fragment::SubrunDataFragmentType)
     344              :                                 {
     345            0 :                                         TLOG(TLVL_WARNING) << "Subrun is unset, discarding EndOfSubrun Fragment(s) for subrun " << first_sr;
     346            0 :                                         event_ordering_.pop_front();
     347              :                                 }
     348              :                                 // Likewise, we cannot close a run that is not open
     349            0 :                                 if (first_type == artdaq::Fragment::EndOfRunFragmentType || first_type == artdaq::Fragment::RunDataFragmentType)
     350              :                                 {
     351            0 :                                         TLOG(TLVL_WARNING) << "Subrun is unset, discarding EndOfRun Fragment(s) for run " << event_ordering_.front()->header->run_id;
     352            0 :                                         event_ordering_.pop_front();
     353              :                                 }
     354              :                         }
     355              : 
     356            0 :                         if (artdaq::TimeUtils::GetElapsedTime(start_time) > safety_valve_timeout_s_)
     357              :                         {
     358            0 :                                 TLOG(TLVL_WARNING) << "Returning Fragment due to safety valve timeout (" << safety_valve_timeout_s_ << " s). event_ordering_ size=" << event_ordering_.size()
     359            0 :                                                    << " (th=" << subrun_closure_threshold_ << "), first event type=" << static_cast<int>(first_type) << " sr="
     360            0 :                                                    << first_sr << " (c=" << current_subrun_ << ")";
     361            0 :                                 output_event = event_ordering_.front();
     362            0 :                                 event_ordering_.pop_front();
     363            0 :                                 break;  // while(output_event == nullptr)
     364              :                         }
     365              :                 }
     366              : 
     367            0 :                 auto next_event = ReadEventFromSharedMemory(broadcast);
     368            0 :                 if (next_event == nullptr)
     369              :                 {
     370            0 :                         if (event_ordering_.size() > 0)
     371              :                         {
     372            0 :                                 output_event = event_ordering_.front();
     373            0 :                                 event_ordering_.pop_front();
     374              :                         }
     375              :                         else
     376              :                         {
     377              :                                 // Will return nullptr
     378            0 :                                 break;  // while(output_event == nullptr)
     379              :                         }
     380              :                 }
     381              :                 else
     382              :                 {
     383              :                         // Reset start time when new event arrives
     384            0 :                         start_time = std::chrono::steady_clock::now();
     385            0 :                         TLOG(TLVL_RECEIVEEVENT_2) << "Adding ArtdaqEvent with run=" << next_event->header->run_id << ", subrun=" << next_event->header->subrun_id << ", seq=" << next_event->header->sequence_id << ", and type " << static_cast<int>(next_event->FirstFragmentType()) << " to event ordering list";
     386            0 :                         if (next_event->FirstFragmentType() == artdaq::Fragment::EndOfDataFragmentType) { end_of_data_received_ = true; }
     387            0 :                         else if (next_event->header->subrun_id < current_subrun_ && next_event->FirstFragmentType() != artdaq::Fragment::RunDataFragmentType && next_event->FirstFragmentType() == artdaq::Fragment::EndOfRunFragmentType)
     388              :                         {
     389            0 :                                 auto seq_mask = 0xFFFFFFFF & next_event->header->sequence_id;
     390            0 :                                 TLOG(TLVL_WARNING) << "ArtdaqEvent with run = " << next_event->header->run_id << ", subrun = " << next_event->header->subrun_id << ", seq = " << next_event->header->sequence_id << " (32b mask " << seq_mask << "), and type " << static_cast<int>(next_event->FirstFragmentType()) << " is from a previous subrun! (current=" << current_subrun_ << ")";
     391              :                         }
     392            0 :                         event_ordering_.push_back(next_event);
     393            0 :                         event_ordering_.sort();
     394              :                 }
     395            0 :         }
     396              : 
     397            0 :         if (output_event != nullptr)
     398              :         {
     399            0 :                 auto type = output_event->FirstFragmentType();
     400            0 :                 TLOG(subrun_has_events_ ? TLVL_RECEIVEEVENT_3 : TLVL_INFO) << "Returning ArtdaqEvent with run=" << output_event->header->run_id << ", subrun=" << output_event->header->subrun_id
     401            0 :                                                                            << ", seq=" << output_event->header->sequence_id << ", and type " << static_cast<int>(type);
     402            0 :                 if (output_event->header->subrun_id > current_subrun_ && output_event->header->subrun_id != 65535)  // EndOfRun Fragments have subrun -1
     403              :                 {
     404            0 :                         if (current_subrun_ != 0)
     405              :                         {
     406            0 :                                 TLOG(TLVL_WARNING) << "Event subrun " << output_event->header->subrun_id << " is greater than current_subrun_ (" << current_subrun_ << "), incrementing";
     407              :                         }
     408              :                         else
     409              :                         {
     410            0 :                                 TLOG(TLVL_DEBUG) << "Incrementing current_subrun_ from 0 to " << output_event->header->subrun_id << " due to unset subrun";
     411              :                         }
     412            0 :                         current_subrun_ = output_event->header->subrun_id;
     413              :                 }
     414            0 :                 if (type == artdaq::Fragment::EndOfSubrunFragmentType || type == artdaq::Fragment::SubrunDataFragmentType || type == artdaq::Fragment::EndOfRunFragmentType || type == artdaq::Fragment::RunDataFragmentType || type == artdaq::Fragment::InitFragmentType)
     415              :                 {
     416            0 :                         if (type == artdaq::Fragment::EndOfSubrunFragmentType || type == artdaq::Fragment::SubrunDataFragmentType)
     417              :                         {
     418            0 :                                 TLOG(TLVL_RECEIVEEVENT_4) << "EndOfSubrun or SubrunData Fragment recieved, incrementing current_subrun from " << current_subrun_ << " to " << (current_subrun_ + 1);
     419            0 :                                 current_subrun_++;
     420            0 :                                 subrun_has_events_ = false;
     421            0 :                         }
     422              :                         else
     423              :                         {
     424            0 :                                 TLOG(TLVL_DEBUG) << "Due to run/subrun/control conditions, setting current_subrun to 0";
     425            0 :                                 current_subrun_ = 0;
     426            0 :                                 subrun_has_events_ = false;
     427              :                         }
     428            0 :                 }
     429              :                 else
     430              :                 {
     431            0 :                         subrun_has_events_ = true;
     432              :                 }
     433              :         }
     434              : 
     435            0 :         TLOG(TLVL_RECEIVEEVENT) << "ReceiveEvent END";
     436              : 
     437            0 :         return output_event;
     438            0 : }
     439              : 
     440            0 : DEFINE_ART_SERVICE_INTERFACE_IMPL(ArtdaqSharedMemoryService, ArtdaqSharedMemoryServiceInterface)
        

Generated by: LCOV version 2.0-1