LCOV - code coverage report
Current view: top level - artdaq/DAQrate - SharedMemoryEventManager.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 65.5 % 58 38
Test Date: 2025-09-04 00:45:34 Functions: 81.2 % 16 13

            Line data    Source code
       1              : #ifndef ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
       2              : #define ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
       3              : 
       4              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       5              : #include "artdaq-core/Data/Fragment.hh"
       6              : 
       7              : #include "artdaq-core/Core/SharedMemoryManager.hh"
       8              : #include "artdaq-core/Data/RawEvent.hh"
       9              : #include "artdaq-core/Utilities/configureMessageFacility.hh"
      10              : #include "artdaq/DAQrate/StatisticsHelper.hh"
      11              : #include "artdaq/DAQrate/detail/RequestSender.hh"
      12              : #include "artdaq/DAQrate/detail/TokenSender.hh"
      13              : 
      14              : #include "fhiclcpp/types/Atom.h"
      15              : #include "fhiclcpp/types/Comment.h"
      16              : #include "fhiclcpp/types/ConfigurationTable.h"
      17              : #include "fhiclcpp/types/OptionalTable.h"
      18              : #include "fhiclcpp/types/TableFragment.h"
      19              : 
      20              : #define ART_SUPPORTS_DUPLICATE_EVENTS 0
      21              : 
      22              : #include <sys/stat.h>
      23              : #include <deque>
      24              : #include <fstream>
      25              : #include <iomanip>
      26              : #include <set>
      27              : 
      28              : namespace artdaq {
      29              : 
      30              : /**
      31              :  * \brief art_config_file wraps a temporary file used to configure art
      32              :  */
      33              : class art_config_file
      34              : {
      35              : public:
      36              :         /**
      37              :          * \brief art_config_file Constructor
      38              :          * \param ps ParameterSet to write to temporary file
      39              :          * \param shm_key Shared Memory key to use (if 0, child program will use parent PID to generate)
      40              :          * \param broadcast_key Shared Memory key to use for broadcasts (if 0, child program will use parent PID to generate)
      41              :          */
      42           16 :         explicit art_config_file(fhicl::ParameterSet ps, uint32_t shm_key = 0, uint32_t broadcast_key = 0)
      43           16 :             : dir_name_("/tmp/partition_" + std::to_string(Globals::GetPartitionNumber()))
      44           16 :             , file_name_(dir_name_ + "/artConfig_" + std::to_string(my_rank) + "_" + std::to_string(artdaq::TimeUtils::gettimeofday_us()) + ".fcl")
      45              :         {
      46           16 :                 mkdir(dir_name_.c_str(), 0777);  // Allowed to fail if directory already exists
      47              : 
      48           16 :                 std::ofstream of(file_name_, std::ofstream::trunc);
      49           16 :                 if (of.fail())
      50              :                 {
      51              :                         // Probably a permissions error...
      52            0 :                         dir_name_ = "/tmp/partition_" + std::to_string(Globals::GetPartitionNumber()) + "_" + std::to_string(getuid());
      53            0 :                         mkdir(dir_name_.c_str(), 0777);  // Allowed to fail if directory already exists
      54            0 :                         file_name_ = dir_name_ + "/artConfig_" + std::to_string(my_rank) + "_" + std::to_string(artdaq::TimeUtils::gettimeofday_us()) + ".fcl";
      55              : 
      56            0 :                         of.open(file_name_, std::ofstream::trunc);
      57            0 :                         if (of.fail())
      58              :                         {
      59            0 :                                 TLOG(TLVL_ERROR, "ArtConfigFile") << "Failed to open configuration file after two attemps! ABORTING!";
      60            0 :                                 exit(46);
      61              :                         }
      62              :                 }
      63           16 :                 of << ps.to_string();
      64              : 
      65           60 :                 if (ps.has_key("services") && ps.has_key("services.message"))
      66              :                 {
      67            0 :                         auto existing_message_config = ps.get<fhicl::ParameterSet>("services.message");
      68            0 :                         auto existing_destinations = existing_message_config.get<fhicl::ParameterSet>("destinations");
      69            0 :                         auto generated_message_config = generateMessageFacilityConfiguration(mf::GetApplicationName().c_str(), true, false, "-art");
      70            0 :                         auto generated_message_pset = fhicl::ParameterSet::make(generated_message_config);
      71            0 :                         auto generated_destinations = generated_message_pset.get<fhicl::ParameterSet>("destinations");
      72            0 :                         for (auto& dest : generated_destinations.get_pset_names())
      73              :                         {
      74            0 :                                 existing_destinations.put(dest, generated_destinations.get<fhicl::ParameterSet>(dest));
      75            0 :                         }
      76            0 :                         existing_message_config.put_or_replace("destinations", existing_destinations);
      77            0 :                         of << " services.message: { " + existing_message_config.to_string() << "} ";
      78            0 :                 }
      79              :                 else
      80              :                 {
      81           16 :                         of << " services.message: { " << generateMessageFacilityConfiguration(mf::GetApplicationName().c_str(), true, false, "-art") << "} ";
      82              :                 }
      83              : 
      84           16 :                 if (shm_key > 0 || broadcast_key > 0) TLOG(TLVL_INFO, "ArtConfigFile") << "Inserting Shared memory keys (0x" << std::hex << shm_key << ", 0x" << std::hex << broadcast_key << ") into source config";
      85           16 :                 if (shm_key > 0) of << " source.shared_memory_key: 0x" << std::hex << shm_key;
      86           16 :                 if (broadcast_key > 0) of << " source.broadcast_shared_memory_key: 0x" << std::hex << broadcast_key;
      87              : 
      88           16 :                 of.flush();
      89           16 :                 of.close();
      90           16 :         }
      91           16 :         ~art_config_file()
      92              :         {
      93           16 :                 remove(file_name_.c_str());
      94           16 :                 rmdir(dir_name_.c_str());  // Will only delete directory if no config files are left over
      95           16 :         }
      96              :         /**
      97              :          * \brief Get the path of the temporary file
      98              :          * \return The path of the temporary file
      99              :          */
     100            6 :         std::string getFileName() const { return file_name_; }
     101              : 
     102              : private:
     103              :         art_config_file(art_config_file const&) = delete;
     104              :         art_config_file(art_config_file&&) = delete;
     105              :         art_config_file& operator=(art_config_file const&) = delete;
     106              :         art_config_file& operator=(art_config_file&&) = delete;
     107              : 
     108              :         std::string dir_name_;
     109              :         std::string file_name_;
     110              : };
     111              : 
     112              : /**
     113              :  * \brief The SharedMemoryEventManager is a SharedMemoryManger which tracks events as they are built
     114              :  */
     115              : class SharedMemoryEventManager : public SharedMemoryManager
     116              : {
     117              : public:
     118              :         static const std::string FRAGMENTS_RECEIVED_STAT_KEY;  ///< Key for Fragments Received MonitoredQuantity
     119              :         static const std::string EVENTS_RELEASED_STAT_KEY;     ///< Key for the Events Released MonitoredQuantity
     120              : 
     121              :         typedef RawEvent::run_id_t run_id_t;                     ///< Copy RawEvent::run_id_t into local scope
     122              :         typedef RawEvent::subrun_id_t subrun_id_t;               ///< Copy RawEvent::subrun_id_t into local scope
     123              :         typedef Fragment::sequence_id_t sequence_id_t;           ///< Copy Fragment::sequence_id_t into local scope
     124              :         typedef std::map<sequence_id_t, RawEvent_ptr> EventMap;  ///< An EventMap is a map of RawEvent_ptr objects, keyed by sequence ID
     125              : 
     126              :         /// <summary>
     127              :         /// Configuration of the SharedMemoryEventManager. May be used for parameter validation
     128              :         /// </summary>
     129              :         struct Config
     130              :         {
     131              :                 /// "max_event_size_bytes" REQUIRED: Maximum event size(all Fragments), in bytes
     132              :                 /// Either max_fragment_size_bytes or max_event_size_bytes must be specified
     133              :                 fhicl::Atom<size_t> max_event_size_bytes{fhicl::Name{"max_event_size_bytes"}, fhicl::Comment{"Maximum event size (all Fragments), in bytes"}};
     134              :                 /// "stale_buffer_timeout_usec" (Default: event_queue_wait_time * 1, 000, 000) : Maximum amount of time elapsed before a buffer is marked as abandoned.Time is reset each time an operation is performed on the buffer.
     135              :                 fhicl::Atom<size_t> stale_buffer_timeout_usec{fhicl::Name{"stale_buffer_timeout_usec"}, fhicl::Comment{"Maximum amount of time elapsed before a buffer is marked as abandoned. Time is reset each time an operation is performed on the buffer."}, 5000000};
     136              :                 /// "overwite_mode" (Default: false): Whether new data is allowed to overwrite buffers in the "Full" state
     137              :                 fhicl::Atom<bool> overwrite_mode{fhicl::Name{"overwrite_mode"}, fhicl::Comment{"Whether buffers are allowed to be overwritten when safe (state == Full or Reading)"}, false};
     138              :                 /// "restart_crashed_art_processes" (Default: true) : Whether to automatically restart art processes that fail for any reason
     139              :                 fhicl::Atom<bool> restart_crashed_art_processes{fhicl::Name{"restart_crashed_art_processes"}, fhicl::Comment{"Whether to automatically restart art processes that fail for any reason"}, true};
     140              :                 /// "shared_memory_key" (Default 0xBEE70000 + PID) : Key to use for shared memory access
     141              :                 fhicl::Atom<uint32_t> shared_memory_key{fhicl::Name{"shared_memory_key"}, fhicl::Comment{"Key to use for shared memory access"}, 0xBEE70000 + getpid()};
     142              :                 /// "buffer_count" REQUIRED: Number of events in the Shared Memory(incomplete + pending art)
     143              :                 fhicl::Atom<size_t> buffer_count{fhicl::Name{"buffer_count"}, fhicl::Comment{"Number of events in the Shared Memory (incomplete + pending art)"}};
     144              :                 /// "max_fragment_size_bytes" REQURIED: Maximum Fragment size, in bytes
     145              :                 /// Either max_fragment_size_bytes or max_event_size_bytes must be specified
     146              :                 fhicl::Atom<size_t> max_fragment_size_bytes{fhicl::Name{"max_fragment_size_bytes"}, fhicl::Comment{" Maximum Fragment size, in bytes"}};
     147              :                 /// "event_queue_wait_time" (Default: 5) : Amount of time(in seconds) an event can exist in shared memory before being released to art.Used as input to default parameter of "stale_buffer_timeout_usec".
     148              :                 fhicl::Atom<size_t> event_queue_wait_time{fhicl::Name{"event_queue_wait_time"}, fhicl::Comment{"Amount of time (in seconds) an event can exist in shared memory before being released to art. Used as input to default parameter of \"stale_buffer_timeout_usec\"."}, 5};
     149              :                 /// "broadcast_mode" (Default: false) : When true, buffers are not marked Empty when read, but return to Full state.Buffers are overwritten in order received.
     150              :                 fhicl::Atom<bool> broadcast_mode{fhicl::Name{"broadcast_mode"}, fhicl::Comment{"When true, buffers are not marked Empty when read, but return to Full state. Buffers are overwritten in order received."}, false};
     151              :                 /// "art_analyzer_count" (Default: 1) : Number of art procceses to start
     152              :                 fhicl::Atom<size_t> art_analyzer_count{fhicl::Name{"art_analyzer_count"}, fhicl::Comment{"Number of art procceses to start"}, 1};
     153              :                 /// "expected_fragments_per_event" (REQUIRED) : Number of Fragments to expect per event
     154              :                 fhicl::Atom<size_t> expected_fragments_per_event{fhicl::Name{"expected_fragments_per_event"}, fhicl::Comment{"Number of Fragments to expect per event"}};
     155              :                 /// "maximum_oversize_fragment_count" (Default: 1): Maximum number of over-size Fragments to drop before throwing an exception. Default is 1, which means to throw an exception if any over-size Fragments are dropped. Set to 0 to disable.
     156              :                 fhicl::Atom<int> maximum_oversize_fragment_count{fhicl::Name{"maximum_oversize_fragment_count"}, fhicl::Comment{"Maximum number of over-size Fragments to drop before throwing an exception.  Default is 1, which means to throw an exception if any over-size Fragments are dropped. Set to 0 to disable."}, 1};
     157              :                 /// "update_run_ids_on_new_fragment" (Default: true) : Whether the run and subrun ID of an event should be updated whenever a Fragment is added.
     158              :                 fhicl::Atom<bool> update_run_ids_on_new_fragment{fhicl::Name{"update_run_ids_on_new_fragment"}, fhicl::Comment{"Whether the run and subrun ID of an event should be updated whenever a Fragment is added."}, true};
     159              :                 /// "use_sequence_id_for_event_number" (Default: true): Whether to use the artdaq Sequence ID (true) or the Timestamp (false) for art Event numbers
     160              :                 fhicl::Atom<bool> use_sequence_id_for_event_number{fhicl::Name{"use_sequence_id_for_event_number"}, fhicl::Comment{"Whether to use the artdaq Sequence ID (true) or the Timestamp (false) for art Event numbers"}, true};
     161              :                 /// "max_subrun_lookup_table_size" (Default: 100): The maximum number of entries to store in the sequence ID-SubRun ID lookup table
     162              :                 fhicl::Atom<size_t> max_subrun_lookup_table_size{fhicl::Name{"max_subrun_lookup_table_size"}, fhicl::Comment{"The maximum number of entries to store in the sequence ID-SubRun ID lookup table"}, 100};
     163              :                 /// "max_event_list_length" (Default: 100): The maximum number of entries to store in the released events list
     164              :                 fhicl::Atom<size_t> max_event_list_length{fhicl::Name{"max_event_list_length"}, fhicl::Comment{" The maximum number of entries to store in the released events list"}, 100};
     165              :                 /// "send_init_fragments" (Default: true): Whether Init Fragments are expected to be sent to art. If true, a Warning message is printed when an Init Fragment is requested but none are available.
     166              :                 fhicl::Atom<bool> send_init_fragments{fhicl::Name{"send_init_fragments"}, fhicl::Comment{"Whether Init Fragments are expected to be sent to art. If true, a Warning message is printed when an Init Fragment is requested but none are available."}, true};
     167              :                 /// "open_event_report_interval_ms" (Default: -1): Interval at which an open event report should be written
     168              :                 fhicl::Atom<int> open_event_report_interval_ms{fhicl::Name{"open_event_report_interval_ms"}, fhicl::Comment{"Interval at which an open event report should be written"}, -1};
     169              :                 /// "fragment_broadcast_timeout_ms" (Default: 3000): Amount of time broadcast fragments should live in the broadcast shared memory segment
     170              :                 /// A "Broadcast shared memory segment" is used for all system-level fragments, such as Init, Start/End Run, Start/End Subrun and EndOfData
     171              :                 fhicl::Atom<int> fragment_broadcast_timeout_ms{fhicl::Name{"fragment_broadcast_timeout_ms"}, fhicl::Comment{"Amount of time broadcast fragments should live in the broadcast shared memory segment"}, 3000};
     172              :                 /// "art_command_line"  (Default: "art -c \#CONFIG_FILE\#"): Command line used to start analysis processes. Supports two special sequences: \#CONFIG_FILE\# will be replaced with the fhicl config file. \#PROCESS_INDEX\# will be replaced by the index of the art process.
     173              :                 fhicl::Atom<std::string> art_command_line{fhicl::Name{"art_command_line"}, fhicl::Comment{"Command line used to start analysis processes. Supports two special sequences: #CONFIG_FILE# will be replaced with the fhicl config file. #PROCESS_INDEX# will be replaced by the index of the art process."}, "art -c #CONFIG_FILE#"};
     174              :                 /// "art_index_offset" (Default: 0): Offset to add to art process index when replacing \#PROCESS_INDEX\#
     175              :                 fhicl::Atom<size_t> art_index_offset{fhicl::Name{"art_index_offset"}, fhicl::Comment{"Offset to add to art process index when replacing #PROCESS_INDEX#"}, 0};
     176              :                 /// "minimum_art_lifetime_s" (Default: 2 seconds): Amount of time that an art process should run to not be considered "DOA"
     177              :                 fhicl::Atom<double> minimum_art_lifetime_s{fhicl::Name{"minimum_art_lifetime_s"}, fhicl::Comment{"Amount of time that an art process should run to not be considered \"DOA\""}, 2.0};
     178              :                 /// "expected_art_event_processing_time_us" (Default: 100000 us): During shutdown, SMEM will wait for this amount of time while it is checking that the art threads are done reading buffers.
     179              :                 ///                                                                                                                              (TUNING: Should be slightly longer than the mean art processing time, but not so long that the Stop transition times out)
     180              :                 fhicl::Atom<size_t> expected_art_event_processing_time_us{fhicl::Name{"expected_art_event_processing_time_us"}, fhicl::Comment{"During shutdown, SMEM will wait for this amount of time while it is checking that the art threads are done reading buffers."}, 100000};
     181              :                 /// "broadcast_shared_memory_key" (Default: 0xCEE7000 + PID): Key to use for broadcast shared memory access
     182              :                 fhicl::Atom<uint32_t> broadcast_shared_memory_key{fhicl::Name{"broadcast_shared_memory_key"}, fhicl::Comment{""}, 0xCEE70000 + getpid()};
     183              :                 /// "broadcast_buffer_count" (Default: 10): Buffers in the broadcast shared memory segment
     184              :                 fhicl::Atom<size_t> broadcast_buffer_count{fhicl::Name{"broadcast_buffer_count"}, fhicl::Comment{"Buffers in the broadcast shared memory segment"}, 10};
     185              :                 /// "broadcast_buffer_size" (Default: 0x100000): Size of the buffers in the broadcast shared memory segment
     186              :                 fhicl::Atom<size_t> broadcast_buffer_size{fhicl::Name{"broadcast_buffer_size"}, fhicl::Comment{"Size of the buffers in the broadcast shared memory segment"}, 0x100000};
     187              :                 /// "use_art" (Default: true): Whether to start and manage art threads (Sets art_analyzer count to 0 and overwrite_mode to true when false)
     188              :                 fhicl::Atom<bool> use_art{fhicl::Name{"use_art"}, fhicl::Comment{"Whether to start and manage art threads (Sets art_analyzer count to 0 and overwrite_mode to true when false)"}, true};
     189              :                 /// "manual_art" (Default: false): Prints the startup command line for the art process so that the user may (for example) run it in GDB or valgrind
     190              :                 fhicl::Atom<bool> manual_art{fhicl::Name{"manual_art"}, fhicl::Comment{"Prints the startup command line for the art process so that the user may (for example) run it in GDB or valgrind"}, false};
     191              :                 /// Configuration of the RequestSender. See artdaq::RequestSender::Config
     192              :                 fhicl::TableFragment<artdaq::RequestSender::Config> requestSenderConfig;
     193              :                 /// Configuration of the TokenSender. See artdaq::TokenSender::Config
     194              :                 fhicl::OptionalTable<artdaq::TokenSender::Config> tokenSenderConfig{fhicl::Name{"routing_token_config"}, fhicl::Comment{"Configuration for the Routing TokenSender"}};
     195              :         };
     196              :         /// Used for ParameterSet validation (if desired)
     197              :         using Parameters = fhicl::WrappedTable<Config>;
     198              : 
     199              :         /**
     200              :          * \brief SharedMemoryEventManager Constructor
     201              :          * \param pset ParameterSet used to configure SharedMemoryEventManager. See artdaq::SharedMemoryEventManager::Config for description of parameters
     202              :          * \param art_pset ParameterSet used to configure art. See art::Config for description of expected document format
     203              :          */
     204              :         SharedMemoryEventManager(const fhicl::ParameterSet& pset, fhicl::ParameterSet art_pset);
     205              :         /**
     206              :          * \brief SharedMemoryEventManager Destructor
     207              :          */
     208              :         virtual ~SharedMemoryEventManager() noexcept;
     209              : 
     210              : private:
     211              :         /**
     212              :          * \brief Add a Fragment to the SharedMemoryEventManager
     213              :          * \param frag Header of the Fragment (seq ID and size info)
     214              :          * \param dataPtr Pointer to the fragment's data (i.e. Fragment::headerAddress())
     215              :          * \return Whether the Fragment was successfully added
     216              :          */
     217              :         bool AddFragment(detail::RawFragmentHeader frag, void* dataPtr);
     218              : 
     219              : public:
     220              :         /**
     221              :          * \brief Copy a Fragment into the SharedMemoryEventManager
     222              :          * \param frag FragmentPtr object
     223              :          * \param timeout_usec Timeout for adding Fragment to the Shared Memory
     224              :          * \param [out] outfrag Rejected Fragment if timeout occurs
     225              :          * \return Whether the Fragment was successfully added
     226              :          */
     227              :         bool AddFragment(FragmentPtr frag, size_t timeout_usec, FragmentPtr& outfrag);
     228              : 
     229              :         /**
     230              :          * \brief Get a pointer to a reserved memory area for the given Fragment header
     231              :          * \param frag Fragment header (contains sequence ID and size information)
     232              :          * \param dropIfNoBuffersAvailable Whether to drop the fragment (instead of returning nullptr) when no buffers are available (Default: false)
     233              :          * \return Pointer to memory location for Fragment body (Header is copied into buffer here)
     234              :          */
     235              :         RawDataType* WriteFragmentHeader(detail::RawFragmentHeader frag, bool dropIfNoBuffersAvailable = false);
     236              : 
     237              :         /**
     238              :          * \brief Used to indicate that the given Fragment is now completely in the buffer. Will check for buffer completeness, and unset the pending flag.
     239              :          * \param frag Fragment that is now completely in the buffer.
     240              :          */
     241              :         void DoneWritingFragment(detail::RawFragmentHeader frag);
     242              : 
     243              :         /**
     244              :          * \brief Returns the number of buffers which contain data but are not yet complete
     245              :          * \return The number of buffers which contain data but are not yet complete
     246              :          */
     247           41 :         size_t GetOpenEventCount() { return active_buffers_.size(); }
     248              : 
     249              :         /**
     250              :          * \brief Returns the number of events which are complete but waiting on lower sequenced events to finish
     251              :          * \return The number of events which are complete but waiting on lower sequenced events to finish
     252              :          */
     253            6 :         size_t GetPendingEventCount() { return pending_buffers_.size(); }
     254              : 
     255              :         /**
     256              :          * \brief Returns the number of buffers currently owned by this manager
     257              :          * \return The number of buffers currently owned by this manager
     258              :          */
     259            1 :         size_t GetLockedBufferCount() { return GetBuffersOwnedByManager().size(); }
     260              : 
     261              :         /**
     262              :          * \brief Returns the number of events sent to art this run
     263              :          * \return The number of events sent to art this run
     264              :          */
     265           19 :         size_t GetArtEventCount() { return run_event_count_; }
     266              : 
     267              :         /**
     268              :          * \brief Get the count of Fragments of a given type in an event
     269              :          * \param seqID Sequence ID of Fragments
     270              :          * \param type Type of fragments to count. Use InvalidFragmentType to count all fragments (default)
     271              :          * \return Number of Fragments in event of given type
     272              :          */
     273              :         size_t GetFragmentCount(Fragment::sequence_id_t seqID, Fragment::type_t type = Fragment::InvalidFragmentType);
     274              : 
     275              :         /**
     276              :          * \brief Get the count of Fragments of a given type in a buffer
     277              :          * \param buffer Buffer to count
     278              :          * \param type Type of fragments to count. Use InvalidFragmentType to count all fragments (default)
     279              :          * \return Number of Fragments in buffer of given type
     280              :          */
     281              :         size_t GetFragmentCountInBuffer(int buffer, Fragment::type_t type = Fragment::InvalidFragmentType);
     282              : 
     283              :         void UpdateFragmentHeader(int buffer, detail::RawFragmentHeader hdr);
     284              : 
     285              :         /**
     286              :          * \brief Run an art instance, recording the return codes and restarting it until the end flag is raised
     287              :          */
     288              :         void RunArt(size_t process_index, const std::shared_ptr<std::atomic<pid_t>>& pid_out);
     289              :         /**
     290              :          * \brief Start all the art processes
     291              :          */
     292              :         void StartArt();
     293              : 
     294              :         /**
     295              :          * \brief Start one art process
     296              :          * \param pset ParameterSet to send to this art process
     297              :          * \param process_index Index of this art process (when starting multiple)
     298              :          * \return pid_t of the started process
     299              :          */
     300              :         pid_t StartArtProcess(fhicl::ParameterSet pset, size_t process_index);
     301              : 
     302              :         /**
     303              :          * \brief Shutdown a set of art processes
     304              :          * \param pids PIDs of the art processes
     305              :          */
     306              :         void ShutdownArtProcesses(std::set<pid_t>& pids);
     307              : 
     308              :         /**
     309              :          * \brief Restart all art processes, using the given fhicl code to configure the new art processes
     310              :          * \param art_pset ParameterSet used to configure art
     311              :          * \param newRun New Run number for reconfigured art
     312              :          * \param n_art_processes Number of art processes to start, -1 (default) leaves the number unchanged
     313              :          */
     314              :         void ReconfigureArt(fhicl::ParameterSet art_pset, run_id_t newRun = 0, int n_art_processes = -1);
     315              : 
     316              :         /**
     317              :          * \brief Indicate that the end of input has been reached to the art processes.
     318              :          * \return True if the end proceeded correctly
     319              :          *
     320              :          * Put the end-of-data marker onto the RawEvent queue (if possible),
     321              :          * wait for the reader function to exit, and fill in the reader return
     322              :          * value.  This scenario returns true.  If the end-of-data marker
     323              :          * can not be pushed onto the RawEvent queue, false is returned.
     324              :          */
     325              :         bool endOfData();
     326              : 
     327              :         /**
     328              :          * \brief Start a Run
     329              :          * \param runID Run number of the new run
     330              :          */
     331              :         void startRun(run_id_t runID);
     332              : 
     333              :         /**
     334              :          * \brief Get the current Run number
     335              :          * \return The current Run number
     336              :          */
     337            6 :         run_id_t runID() const { return run_id_; }
     338              : 
     339              :         /**
     340              :          * \brief Send an EndOfRunFragment to the art thread
     341              :          * \return True if enqueue successful
     342              :          */
     343              :         bool endRun();
     344              : 
     345              :         /**
     346              :          * \brief Rollover the subrun after the specified event
     347              :          * \param boundary sequence ID of the boundary (Event with seqID == boundary will be in new subrun)
     348              :          * \param subrun Subrun number of subrun after boundary
     349              :          * \param sendFragment Create and send an EndOfSubrun Fragment for this transition
     350              :          */
     351              :         void rolloverSubrun(sequence_id_t boundary, subrun_id_t subrun, bool sendFragment);
     352              : 
     353              :         /**
     354              :          * \brief Add a subrun transition immediately after the highest currently define sequence ID
     355              :          */
     356              :         void rolloverSubrun(bool sendFragment);
     357              : 
     358              :         /**
     359              :          * \brief Send metrics to the MetricManager, if one has been instantiated in the application
     360              :          */
     361              :         void sendMetrics();
     362              : 
     363              :         /**
     364              :          * \brief Set the RequestMessageMode for all outgoing data requests
     365              :          * \param mode Mode to set
     366              :          */
     367            2 :         void setRequestMode(detail::RequestMessageMode mode)
     368              :         {
     369            2 :                 if (requests_) requests_->SetRequestMode(mode);
     370            2 :         }
     371              : 
     372              :         /**
     373              :          * \brief Set the overwrite flag (non-reliable data transfer) for the Shared Memory
     374              :          * \param overwrite Whether to allow the writer to overwrite data that has not yet been read
     375              :          */
     376              :         void setOverwrite(bool overwrite) { overwrite_mode_ = overwrite; }
     377              : 
     378              :         /**
     379              :          * \brief Set the stored Init fragment, if one has not yet been set already.
     380              :          */
     381              :         void AddInitFragment(FragmentPtr& frag);
     382              : 
     383              :         /**
     384              :          * @brief Add a Fragment for broadcast. May be collected with other Fragments before sending
     385              :          * @param frag Fragment to broadcast
     386              :          */
     387              :         void BroadcastFragment(FragmentPtr& frag);
     388              : 
     389              :         /**
     390              :          * \brief Gets the shared memory key of the broadcast SharedMemoryManager
     391              :          * \return The shared memory key of the broadcast SharedMemoryManager
     392              :          */
     393            1 :         uint32_t GetBroadcastKey() { return broadcasts_.GetKey(); }
     394              : 
     395              :         /**
     396              :          * \brief Gets the address of the "dropped data" fragment. Used for testing.
     397              :          * \param frag Fragment ID to get "dropped data" for
     398              :          * \return Pointer to the data payload of the "dropped data" fragment
     399              :          */
     400            2 :         RawDataType* GetDroppedDataAddress(detail::RawFragmentHeader frag)
     401              :         {
     402            2 :                 for (auto it = dropped_data_.begin(); it != dropped_data_.end(); ++it)
     403              :                 {
     404            2 :                         if (frag.operator==(it->first))  // TODO, ELF 5/26/2023: Workaround until artdaq_core can be fixed for C++20
     405              :                         {
     406            2 :                                 return it->second->dataBegin();
     407              :                         }
     408              :                 }
     409            0 :                 return nullptr;
     410              :         }
     411              : 
     412              :         /**
     413              :          * \brief Updates the internally-stored copy of the art configuration.
     414              :          * \param art_pset ParameterSet used to configure art
     415              :          *
     416              :          * This method updates the internally-stored copy of the art configuration, but it does not
     417              :          * restart art processes.  So, if this method is called while art processes are running, it will
     418              :          * have no effect until the next restart, such as the next Start of run.  Typically, this
     419              :          * method is intended to be called between runs, when no art processes are running.
     420              :          */
     421              :         void UpdateArtConfiguration(fhicl::ParameterSet art_pset);
     422              : 
     423              :         /**
     424              :          * \brief Check for buffers which are ready to be marked incomplete and released to art and issue tokens for any buffers which are avaialble
     425              :          */
     426              :         void CheckPendingBuffers();
     427              : 
     428              :         /**
     429              :          * \brief Get the subrun number that the given Sequence ID would be assigned to
     430              :          * \param seqID Sequence ID to check
     431              :          * \return Subrun number that the given sequence ID will be associated with
     432              :          */
     433              :         subrun_id_t GetSubrunForSequenceID(Fragment::sequence_id_t seqID);
     434              : 
     435              :         /**
     436              :          * \brief Get the current subrun number (Gets the last defined subrun)
     437              :          * \return Number of the subrun that corresponds to events with the maximum possible sequence ID.
     438              :          */
     439            4 :         subrun_id_t GetCurrentSubrun() { return GetSubrunForSequenceID(Fragment::InvalidSequenceID); }
     440              : 
     441            0 :         std::string BuildStatisticsString() const { return buildStatisticsString_(); };
     442              : 
     443              : private:
     444              :         SharedMemoryEventManager(SharedMemoryEventManager const&) = delete;
     445              :         SharedMemoryEventManager(SharedMemoryEventManager&&) = delete;
     446              :         SharedMemoryEventManager& operator=(SharedMemoryEventManager const&) = delete;
     447              :         SharedMemoryEventManager& operator=(SharedMemoryEventManager&&) = delete;
     448              : 
     449           44 :         size_t get_art_process_count_()
     450              :         {
     451           44 :                 std::unique_lock<std::mutex> lk(art_process_mutex_);
     452           88 :                 return art_processes_.size();
     453           44 :         }
     454              : 
     455              :         std::string buildStatisticsString_() const;
     456              : 
     457              : private:
     458              :         size_t num_art_processes_;
     459              :         size_t const num_fragments_per_event_;
     460              :         size_t const queue_size_;
     461              :         run_id_t run_id_;
     462              : 
     463              :         std::map<sequence_id_t, subrun_id_t> subrun_event_map_;
     464              :         subrun_id_t subrun_id_;
     465              :         size_t max_subrun_event_map_length_;
     466              :         static std::mutex subrun_event_map_mutex_;
     467              :         double subrun_transition_hold_time_s_;
     468              :         std::chrono::steady_clock::time_point last_event_time_;
     469              : 
     470              :         std::set<int> active_buffers_;
     471              :         std::set<int> pending_buffers_;
     472              :         std::unordered_map<Fragment::sequence_id_t, size_t> released_incomplete_events_;
     473              :         std::set<Fragment::sequence_id_t> released_events_;
     474              :         size_t max_event_list_length_;
     475              : 
     476              :         bool update_run_ids_;
     477              :         bool use_sequence_id_for_event_number_;
     478              :         bool overwrite_mode_;
     479              :         size_t init_fragment_count_;
     480              :         std::atomic<bool> running_;
     481              : 
     482              :         std::unordered_map<int, std::atomic<int>> buffer_writes_pending_;
     483              :         std::unordered_map<int, std::mutex> buffer_mutexes_;
     484              :         static std::mutex sequence_id_mutex_;
     485              : 
     486              :         int open_event_report_interval_ms_;
     487              :         std::chrono::steady_clock::time_point last_open_event_report_time_;
     488              :         std::chrono::steady_clock::time_point last_backpressure_report_time_;
     489              :         std::chrono::steady_clock::time_point last_fragment_header_write_time_;
     490              :         std::vector<std::chrono::steady_clock::time_point> event_timing_;
     491              : 
     492              :         StatisticsHelper statsHelper_;
     493              : 
     494              :         int broadcast_timeout_ms_;
     495              : 
     496              :         std::atomic<int> run_event_count_;
     497              :         std::atomic<int> run_incomplete_event_count_;
     498              :         std::atomic<int> subrun_event_count_;
     499              :         std::atomic<int> subrun_incomplete_event_count_;
     500              :         std::atomic<int> oversize_fragment_count_;
     501              :         int maximum_oversize_fragment_count_;
     502              : 
     503              :         mutable std::mutex art_process_mutex_;
     504              :         std::set<pid_t> art_processes_;
     505              :         std::atomic<bool> restart_art_;
     506              :         bool always_restart_art_;
     507              :         std::atomic<bool> manual_art_;
     508              :         fhicl::ParameterSet current_art_pset_;
     509              :         std::shared_ptr<art_config_file> current_art_config_file_;
     510              :         std::string art_cmdline_;
     511              :         size_t art_process_index_offset_;
     512              :         double minimum_art_lifetime_s_;
     513              :         size_t art_event_processing_time_us_;
     514              : 
     515              :         std::unique_ptr<RequestSender> requests_;
     516              :         std::unique_ptr<TokenSender> tokens_;
     517              :         fhicl::ParameterSet data_pset_;
     518              : 
     519              :         std::mutex init_fragments_mutex_;
     520              :         std::unordered_map<Fragment::fragment_id_t, std::unordered_map<int, FragmentPtr>> init_fragment_map_;
     521              :         bool init_frags_sent_{false};
     522              :         std::list<std::pair<detail::RawFragmentHeader, FragmentPtr>> dropped_data_;
     523              : 
     524              :         mutable std::mutex broadcast_mutex_;
     525              :         struct BroadcastEntry
     526              :         {
     527              :                 Fragment::type_t type;
     528              :                 Fragment::sequence_id_t sequence_id;
     529              :                 subrun_id_t subrun_id;
     530              :                 FragmentPtrs fragments;
     531              :                 std::chrono::steady_clock::time_point deadline;
     532              :         };
     533              :         std::vector<BroadcastEntry> pending_broadcasts_;
     534              :         void check_pending_broadcasts_();
     535              : 
     536              :         bool broadcastFragments_(FragmentPtrs& frags);
     537              : 
     538              :         detail::RawEventHeader* getEventHeader_(int buffer);
     539              : 
     540              :         int getBufferForSequenceID_(Fragment::sequence_id_t seqID, bool create_new, Fragment::timestamp_t timestamp = Fragment::InvalidTimestamp);
     541              :         bool hasFragments_(int buffer);
     542              :         void complete_buffer_(int buffer);
     543              :         bool bufferComparator(int bufA, int bufB);
     544              :         void check_pending_buffers_(std::unique_lock<std::mutex> const& lock);
     545              :         std::vector<char*> parse_art_command_line_(const std::shared_ptr<art_config_file>& config_file, size_t process_index);
     546              : 
     547              :         void send_init_frags_();
     548              :         size_t init_fragment_map_size_() const;
     549              :         SharedMemoryManager broadcasts_;
     550              : };
     551              : }  // namespace artdaq
     552              : 
     553              : #endif  // ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
        

Generated by: LCOV version 2.0-1