LCOV - code coverage report
Current view: top level - artdaq/DAQrate - FragmentBuffer.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 92.9 % 14 13
Test Date: 2025-09-04 00:45:34 Functions: 100.0 % 7 7

            Line data    Source code
       1              : #ifndef artdaq_Application_FragmentBuffer_hh
       2              : #define artdaq_Application_FragmentBuffer_hh
       3              : 
       4              : #include "fhiclcpp/types/Sequence.h"  // Must pre-empt fhiclcpp/types/Atom.h
       5              : 
       6              : #include "TRACE/tracemf.h"  // Pre-empt TRACE/trace.h from Fragment.hh.
       7              : #include "artdaq-core/Data/Fragment.hh"
       8              : 
       9              : #include "artdaq/DAQrate/RequestBuffer.hh"
      10              : 
      11              : namespace fhicl {
      12              : class ParameterSet;
      13              : }
      14              : #include "fhiclcpp/types/Atom.h"
      15              : #include "fhiclcpp/types/Comment.h"
      16              : #include "fhiclcpp/types/ConfigurationTable.h"
      17              : #include "fhiclcpp/types/Name.h"
      18              : 
      19              : // Socket Includes
      20              : #include <arpa/inet.h>
      21              : #include <netinet/in.h>
      22              : #include <sys/socket.h>
      23              : #include <sys/types.h>
      24              : #include <unistd.h>
      25              : 
      26              : #include <array>
      27              : #include <atomic>
      28              : #include <chrono>
      29              : #include <condition_variable>
      30              : #include <list>
      31              : #include <mutex>
      32              : #include <queue>
      33              : 
      34              : namespace artdaq {
      35              : /**
      36              :  * \brief The RequestMode enumeration contains the possible ways which FragmentBuffer responds to data requests.
      37              :  */
      38              : enum class RequestMode
      39              : {
      40              :         Single,
      41              :         Buffer,
      42              :         Window,
      43              :         SequenceID,
      44              :         Ignored
      45              : };
      46              : 
      47              : /**
      48              :  * \brief FragmentBuffer is a FragmentGenerator-derived
      49              :  * abstract class that defines the interface for a FragmentGenerator
      50              :  * designed as a state machine with start, stop, etc., transition
      51              :  * commands.
      52              :  *
      53              :  * Users of classes derived from
      54              :  * FragmentBuffer will call these transitions via the
      55              :  * publically defined StartCmd(), StopCmd(), etc.; these public
      56              :  * functions contain functionality considered properly universal to
      57              :  * all FragmentBuffer-derived classes, including calls
      58              :  * to private virtual functions meant to be overridden in derived
      59              :  * classes. The same applies to this class's implementation of the
      60              :  * FragmentGenerator::getNext() pure virtual function, which is
      61              :  * declared final (i.e., non-overridable in derived classes) and which
      62              :  * itself calls a pure virtual getNext_() function to be implemented
      63              :  * in derived classes.
      64              :  *
      65              :  * State-machine related interface functions will be called only from a
      66              :  * single thread. getNext() will be called only from a single
      67              :  * thread. The thread from which state-machine interfaces functions are
      68              :  * called may be a different thread from the one that calls getNext().
      69              :  *
      70              :  * John F., 3/24/14
      71              :  *
      72              :  * After some discussion with Kurt, FragmentBuffer has
      73              :  * been updated such that it now contains a member vector
      74              :  * fragment_ids_ ; if "fragment_id" is set in the FHiCL document
      75              :  * controlling a class derived from FragmentBuffer,
      76              :  * fragment_ids_ will be booked as a length-1 vector, and the value in
      77              :  * this vector will be returned by fragment_id(). fragment_id() will
      78              :  * throw an exception if the length of the vector isn't 1. If
      79              :  * "fragment_ids" is set in the FHiCL document, then fragment_ids_ is
      80              :  * filled with the values in the list which "fragment_ids" refers to,
      81              :  * otherwise it is set to the empty vector (this is what should happen
      82              :  * if the user sets the "fragment_id" variable in the FHiCL document,
      83              :  * otherwise exceptions will end up thrown due to the logical
      84              :  * conflict). If neither "fragment_id" nor "fragment_ids" is set in
      85              :  * the FHiCL document, writers of classes derived from this one will
      86              :  * be expected to override the virtual fragmentIDs() function with
      87              :  * their own code (the CompositeDriver class is an example of this)
      88              :  */
      89              : class FragmentBuffer
      90              : {
      91              : public:
      92              :         /// <summary>
      93              :         /// Configuration of the FragmentBuffer. May be used for parameter validation
      94              :         /// </summary>
      95              :         struct Config
      96              :         {
      97              :                 /// "generator" (REQUIRED) Name of the FragmentBuffer plugin to load
      98              :                 fhicl::Atom<std::string> generator_type{fhicl::Name{"generator"}, fhicl::Comment{"Name of the FragmentBuffer plugin to load"}};
      99              :                 /// "request_window_offset" (Default: 0) : Request messages contain a timestamp. For Window request mode, start the window this far before the timestamp in the request
     100              :                 fhicl::Atom<Fragment::timestamp_t> request_window_offset{fhicl::Name{"request_window_offset"}, fhicl::Comment{"Request messages contain a timestamp. For Window request mode, start the window this far before the timestamp in the request"}, 0};
     101              :                 /// "request_window_width" (Default: 0) : For Window request mode, the window will be timestamp - offset to timestamp - offset + width
     102              :                 fhicl::Atom<Fragment::timestamp_t> request_window_width{fhicl::Name{"request_window_width"}, fhicl::Comment{"For Window request mode, the window will be timestamp - offset to timestamp - offset + width"}, 0};
     103              :                 /// "stale_fragment_timeout" (Default: 0) : Fragments stored in the fragment generator which are older than the newest stored fragment by at least stale_fragment_timeout units of request timestamp ticks will get discarded (0 to disable)
     104              :                 fhicl::Atom<Fragment::timestamp_t> stale_fragment_timeout{fhicl::Name{"stale_fragment_timeout"}, fhicl::Comment{"Fragments stored in the fragment generator which are older than the newest stored fragment by at least stale_fragment_timeout units of request timestamp ticks will get discarded"}, 0};
     105              :                 /// "buffer_mode_keep_latest" (Default: false): Keep the latest Fragment when running in Buffer mode, so that each response has at least one Fragment (Fragment will be discarded if new data arrives before next request)
     106              :                 fhicl::Atom<bool> buffer_mode_keep_latest{fhicl::Name{"buffer_mode_keep_latest"}, fhicl::Comment{"Keep the latest Fragment when running in Buffer mode, so that each response has at least one Fragment (Fragment will be discarded if new data arrives before next request)"}, false};
     107              :                 /// "expected_fragment_type" (Default: 231, EmptyFragmentType) : The type of Fragments this CFG will be generating. "Empty" will auto - detect type based on Fragments generated.
     108              :                 fhicl::Atom<Fragment::type_t> expected_fragment_type{fhicl::Name{"expected_fragment_type"}, fhicl::Comment{"The type of Fragments this CFG will be generating. \"Empty\" will auto-detect type based on Fragments generated."}, Fragment::type_t(Fragment::EmptyFragmentType)};
     109              :                 /// "request_windows_are_unique" (Default: true) : Whether Fragments should be removed from the buffer when matched to a request window
     110              :                 fhicl::Atom<bool> request_windows_are_unique{fhicl::Name{"request_windows_are_unique"}, fhicl::Comment{"Whether Fragments should be removed from the buffer when matched to a request window"}, true};
     111              :                 /// "missing_request_window_timeout_us" (Default: 5000000) : How long to track missing requests in the "out-of-order Windows" list
     112              :                 fhicl::Atom<size_t> missing_request_window_timeout_us{fhicl::Name{"missing_request_window_timeout_us"}, fhicl::Comment{"How long to track missing requests in the \"out - of - order Windows\" list"}, 5000000};
     113              :                 /// "window_close_timeout_us" (Default: 2000000) : How long to wait for the end of the data buffer to pass the end of a request window(measured from the time the request was received)
     114              :                 fhicl::Atom<size_t> window_close_timeout_us{fhicl::Name{"window_close_timeout_us"}, fhicl::Comment{"How long to wait for the end of the data buffer to pass the end of a request window (measured from the time the request was received)"}, 2000000};
     115              :                 /// "separate_data_thread" (Default: false) : Whether data collection should proceed on its own thread.Required for all data request processing
     116              :                 fhicl::Atom<bool> separate_data_thread{fhicl::Name{"separate_data_thread"}, fhicl::Comment{"Whether data collection should proceed on its own thread. Required for all data request processing"}, false};
     117              :                 /// "circular_buffer_mode" (Default: false) : Whether the data buffer should be treated as a circular buffer on the input side (i.e. old fragments are automatically discarded when the buffer is full to always call getNext_).
     118              :                 fhicl::Atom<bool> circular_buffer_mode{fhicl::Name{"circular_buffer_mode"}, fhicl::Comment{"Whether the data buffer should be treated as a circular buffer on the input side (i.e. old fragments are automatically discarded when the buffer is full to always call getNext_)."}, false};
     119              :                 /// "sleep_on_no_data_us" (Default: 0 (no sleep)) : How long to sleep after calling getNext_ if no data is returned
     120              :                 fhicl::Atom<size_t> sleep_on_no_data_us{fhicl::Name{"sleep_on_no_data_us"}, fhicl::Comment{"How long to sleep after calling getNext_ if no data is returned"}, 0};
     121              :                 /// "data_buffer_depth_fragments" (Default: 1000) : The max fragments which can be stored before dropping occurs
     122              :                 fhicl::Atom<int> data_buffer_depth_fragments{fhicl::Name{"data_buffer_depth_fragments"}, fhicl::Comment{"The max fragments which can be stored before dropping occurs"}, 1000};
     123              :                 /// "data_buffer_depth_mb" (Default: 1000) : The max cumulative size in megabytes of the fragments which can be stored before dropping occurs
     124              :                 fhicl::Atom<size_t> data_buffer_depth_mb{fhicl::Name{"data_buffer_depth_mb"}, fhicl::Comment{"The max cumulative size in megabytes of the fragments which can be stored before dropping occurs"}, 1000};
     125              :                 /// "separate_monitoring_thread" (Default: false) : Whether a thread that calls the checkHWStatus_ method should be created
     126              :                 fhicl::Atom<bool> separate_monitoring_thread{fhicl::Name{"separate_monitoring_thread"}, fhicl::Comment{"Whether a thread that calls the checkHWStatus_ method should be created"}, false};
     127              :                 /// "hardware_poll_interval_us" (Default: 0) : If a separate monitoring thread is used, how often should it call checkHWStatus_
     128              :                 fhicl::Atom<int64_t> hardware_poll_interval_us{fhicl::Name{"hardware_poll_interval_us"}, fhicl::Comment{"If a separate monitoring thread is used, how often should it call checkHWStatus_"}, 0};
     129              :                 /// "fragment_ids" (Default: empty vector) : A list of Fragment IDs created by this FragmentBuffer
     130              :                 /// Note that only one of fragment_ids and fragment_id should be specified in the configuration
     131              :                 fhicl::Sequence<Fragment::fragment_id_t> fragment_ids{fhicl::Name("fragment_ids"), fhicl::Comment("A list of Fragment IDs created by this FragmentBuffer")};
     132              :                 /// "fragment_id" (Default: -99) : The Fragment ID created by this FragmentBuffer
     133              :                 /// Note that only one of fragment_ids and fragment_id should be specified in the configuration
     134              :                 fhicl::Atom<int> fragment_id{fhicl::Name{"fragment_id"}, fhicl::Comment{"The Fragment ID created by this FragmentBuffer"}, -99};
     135              :                 /// "sleep_on_stop_us" (Default: 0) : How long to sleep before returning when stop transition is called
     136              :                 fhicl::Atom<int> sleep_on_stop_us{fhicl::Name{"sleep_on_stop_us"}, fhicl::Comment{"How long to sleep before returning when stop transition is called"}, 0};
     137              :                 /// <summary>
     138              :                 /// "request_mode" (Deafult: Ignored) : The mode by which the FragmentBuffer will process reqeusts
     139              :                 /// Ignored : Request messages are ignored.This is a "push" FragmentBuffer
     140              :                 /// Single : The FragmentBuffer responds to each request with the latest Fragment it has received
     141              :                 /// Buffer : The FragmentBuffer responds to each request with all Fragments it has received since the last request
     142              :                 /// Window : The FragmentBuffer searches its data buffer for all Fragments whose timestamp falls within the request window
     143              :                 /// SequenceID:  The FragmentBuffer responds to each request with all Fragments that match the sequence ID in the request
     144              :                 /// </summary>
     145              :                 fhicl::Atom<std::string> request_mode{fhicl::Name{"request_mode"}, fhicl::Comment{"The mode by which the FragmentBuffer will process reqeusts"}, "ignored"};
     146              :         };
     147              :         /// Used for ParameterSet validation (if desired)
     148              :         using Parameters = fhicl::WrappedTable<Config>;
     149              : 
     150              :         /**
     151              :          * \brief FragmentBuffer Constructor
     152              :          * \param ps ParameterSet used to configure FragmentBuffer. See artdaq::FragmentBuffer::Config.
     153              :          */
     154              :         explicit FragmentBuffer(const fhicl::ParameterSet& ps);
     155              : 
     156              :         /**
     157              :          * \brief FragmentBuffer Destructor
     158              :          *
     159              :          * Joins all threads before returning
     160              :          */
     161              :         virtual ~FragmentBuffer();
     162              : 
     163              :         /**
     164              :          * @brief Add Fragments to the FragmentBuffer
     165              :          * @param frags Fragments to add
     166              :          */
     167              :         void AddFragmentsToBuffer(FragmentPtrs frags);
     168              : 
     169              :         /**
     170              :          * @brief Inform the FragmentBuffer that it should stop
     171              :          */
     172            2 :         void Stop() { should_stop_ = true; }
     173              : 
     174              :         /**
     175              :          * @brief Reset the FragmentBuffer (flushes all Fragments from buffers)
     176              :          * @param stop Whether the FragmentBuffer should be stopped during the Reset
     177              :          */
     178              :         void Reset(bool stop);
     179              : 
     180              :         /// <summary>
     181              :         /// Create fragments using data buffer for request mode Ignored.
     182              :         /// Precondition: dataBufferMutex_ and request_mutex_ are locked
     183              :         /// </summary>
     184              :         /// <param name="frags">Ouput fragments</param>
     185              :         void applyRequestsIgnoredMode(artdaq::FragmentPtrs& frags);
     186              : 
     187              :         /// <summary>
     188              :         /// Create fragments using data buffer for request mode Single.
     189              :         /// Precondition: dataBufferMutex_ and request_mutex_ are locked
     190              :         /// </summary>
     191              :         /// <param name="frags">Ouput fragments</param>
     192              :         void applyRequestsSingleMode(artdaq::FragmentPtrs& frags);
     193              : 
     194              :         /// <summary>
     195              :         /// Create fragments using data buffer for request mode Buffer.
     196              :         /// Precondition: dataBufferMutex_ and request_mutex_ are locked
     197              :         /// </summary>
     198              :         /// <param name="frags">Ouput fragments</param>
     199              :         void applyRequestsBufferMode(artdaq::FragmentPtrs& frags);
     200              : 
     201              :         /// <summary>
     202              :         /// Create fragments using data buffer for request mode Window.
     203              :         /// Precondition: dataBufferMutex_ and request_mutex_ are locked
     204              :         /// </summary>
     205              :         /// <param name="frags">Ouput fragments</param>
     206              :         void applyRequestsWindowMode(artdaq::FragmentPtrs& frags);
     207              : 
     208              :         /// <summary>
     209              :         /// Create fragments using data buffer for request mode SequenceID.
     210              :         /// Precondition: dataBufferMutex_ and request_mutex_ are locked
     211              :         /// </summary>
     212              :         /// <param name="frags">Ouput fragments</param>
     213              :         void applyRequestsSequenceIDMode(artdaq::FragmentPtrs& frags);
     214              : 
     215              :         /// Copy data from the relevant data buffer that matches the given timestamp.
     216              :         /// </summary>
     217              :         /// <param name="frags">Output Fragments</param>
     218              :         /// <param name="id">Fragment ID of buffer to search</param>
     219              :         /// <param name="seq">Sequence ID of output Fragment</param>
     220              :         /// <param name="ts">Timestamp of output Fragment (used to determine window limits)</param>
     221              :         void applyRequestsWindowMode_CheckAndFillDataBuffer(artdaq::FragmentPtrs& frags, artdaq::Fragment::fragment_id_t id, artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts);
     222              : 
     223              :         /**
     224              :          * \brief See if any requests have been received, and add the corresponding data Fragment objects to the output list
     225              :          * \param[out] frags list of FragmentPtr objects ready for transmission
     226              :          * \return True if not stopped
     227              :          */
     228              :         bool applyRequests(FragmentPtrs& frags);
     229              : 
     230              :         /**
     231              :          * \brief Send an EmptyFragmentType Fragment
     232              :          * \param[out] frags Output list to append EmptyFragmentType to
     233              :          * \param sequenceId Sequence ID of Empty Fragment
     234              :          * \param fragmentId Fragment ID of Empty Fragment
     235              :          * \param desc Message to log with reasoning for sending Empty Fragment
     236              :          * \return True if no exceptions
     237              :          */
     238              :         bool sendEmptyFragment(FragmentPtrs& frags, size_t sequenceId, Fragment::fragment_id_t fragmentId, std::string desc);
     239              : 
     240              :         /**
     241              :          * \brief This function is for Buffered and Single request modes, as they can only respond to one data request at a time
     242              :          * If the request message seqID > ev_counter, simply send empties until they're equal
     243              :          * \param[out] frags Output list to append EmptyFragmentType to
     244              :          * \param requests List of requests to process
     245              :          */
     246              :         void sendEmptyFragments(FragmentPtrs& frags, std::map<Fragment::sequence_id_t, Fragment::timestamp_t>& requests);
     247              : 
     248              :         /**
     249              :          * \brief Check the windows_sent_ooo_ map for sequence IDs that may be removed
     250              :          * \param seq Sequence ID of current window
     251              :          */
     252              :         void checkSentWindows(Fragment::sequence_id_t seq);
     253              : 
     254              :         /**
     255              :          * \brief Wait for the data buffer to drain (dataBufferIsTooLarge returns false), periodically reporting status.
     256              :          * \param id Fragment ID of data buffer
     257              :          * \return True if wait ended without something else disrupting the run
     258              :          */
     259              :         bool waitForDataBufferReady(Fragment::fragment_id_t id);
     260              : 
     261              :         /**
     262              :          * \brief Test the configured constraints on the data buffer
     263              :          * \param id Fragment ID of data buffer
     264              :          * \return Whether the data buffer is full
     265              :          */
     266              :         bool dataBufferIsTooLarge(Fragment::fragment_id_t id);
     267              : 
     268              :         /**
     269              :          * \brief Calculate the size of the dataBuffer and report appropriate metrics
     270              :          * \param id Fragment ID of buffer
     271              :          */
     272              :         void getDataBufferStats(Fragment::fragment_id_t id);
     273              : 
     274              :         /**
     275              :          * \brief Calculate the size of all dataBuffers and report appropriate metrics
     276              :          */
     277           84 :         void getDataBuffersStats()
     278              :         {
     279          222 :                 for (auto& id : dataBuffers_) getDataBufferStats(id.first);
     280           84 :         }
     281              : 
     282              :         /**
     283              :          * \brief report statistics as a string
     284              :          */
     285              :         std::string getStatReport();
     286              : 
     287              :         /**
     288              :          * \brief Perform data buffer pruning operations for the given buffer. If the RequestMode is Single, removes all but the latest Fragment from the data buffer.
     289              :          * \param id Fragment ID of buffer
     290              :          * In Window and Buffer RequestModes, this function discards the oldest Fragment objects until the data buffer is below its size constraints,
     291              :          * then also checks for stale Fragments, based on the timestamp of the most recent Fragment.
     292              :          */
     293              :         void checkDataBuffer(Fragment::fragment_id_t id);
     294              : 
     295              :         /**
     296              :          * \brief Perform data buffer pruning operations for all buffers.
     297              :          */
     298          179 :         void checkDataBuffers()
     299              :         {
     300          410 :                 for (auto& id : dataBuffers_) checkDataBuffer(id.first);
     301          179 :         }
     302              : 
     303              :         /**
     304              :          * \brief Get the map of Window-mode requests fulfilled by this Fragment Geneerator for the given Fragment ID
     305              :          * \param id Fragment ID of buffer
     306              :          * \return Map of sequence_id and time_point for sent Window-mode requests
     307              :          *
     308              :          * This function is used in FragmentBuffer_t to verify correct functioning of Window mode
     309              :          */
     310           10 :         std::map<Fragment::sequence_id_t, std::chrono::steady_clock::time_point> GetSentWindowList(Fragment::fragment_id_t id)
     311              :         {
     312           10 :                 if (!dataBuffers_.count(id))
     313              :                 {
     314            0 :                         throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot get Sent Windows for ID " << id << " because it does not exist!";
     315              :                 }
     316              : 
     317           10 :                 return dataBuffers_[id]->WindowsSent;
     318              :         }
     319              : 
     320              :         /**
     321              :          * \brief Get the list of Fragment IDs handled by this FragmentBuffer
     322              :          * \return A std::vector<Fragment::fragment_id_t> containing the Fragment IDs handled by this FragmentBuffer
     323              :          */
     324              :         std::vector<Fragment::fragment_id_t> fragmentIDs()
     325              :         {
     326              :                 std::vector<Fragment::fragment_id_t> output;
     327              : 
     328              :                 for (auto& id : dataBuffers_)
     329              :                 {
     330              :                         output.push_back(id.first);
     331              :                 }
     332              : 
     333              :                 return output;
     334              :         }
     335              : 
     336              :         /// <summary>
     337              :         /// Get the current request mode of the FragmentBuffer
     338              :         /// </summary>
     339              :         /// <returns>Current RequestMode of the CFG</returns>
     340            2 :         RequestMode request_mode() const { return mode_; }
     341              : 
     342              :         // The following functions are not yet implemented, and their
     343              :         // signatures may be subject to change.
     344              : 
     345              :         // John F., 12/6/13 -- do we want Reset and Shutdown commands?
     346              :         // Kurt B., 15-Feb-2014. For the moment, I suspect that we don't
     347              :         // want a Shutdown command. FragmentGenerator instances are
     348              :         // Constructed at Initialization time, and they are destructed
     349              :         // at Shutdown time. So, any shutdown operations that need to be
     350              :         // done should be put in the FragmentGenerator child class
     351              :         // destructors. If we find that want shutdown (or initialization)
     352              :         // operations that are different from destruction (construction),
     353              :         // then we'll have to add InitCmd and ShutdownCmd methods.
     354              : 
     355              :         //    virtual void ResetCmd() final {}
     356              :         //    virtual void ShutdownCmd() final {}
     357              : 
     358              :         /**
     359              :          * @brief Set the pointer to the RequestBuffer used to retrieve requests
     360              :          * @param buffer Pointer to the RequestBuffer
     361              :          */
     362           25 :         void SetRequestBuffer(std::shared_ptr<RequestBuffer> buffer) { requestBuffer_ = buffer; }
     363              : 
     364              :         /**
     365              :          * @brief Get the next sequence ID expected by this FragmentBuffer. This is used to track sent windows and missed requests
     366              :          * @return The next sequence ID expected by this FragmentBuffer
     367              :          */
     368          216 :         artdaq::Fragment::sequence_id_t GetNextSequenceID() const { return next_sequence_id_; }
     369              : 
     370              : protected:
     371              :         // John F., 12/6/13 -- need to figure out which of these getter
     372              :         // functions should be promoted to "public"
     373              : 
     374              :         // John F., 1/21/15 -- after more than a year, there hasn't been a
     375              :         // single complaint that a FragmentBuffer-derived
     376              :         // class hasn't allowed its users to access these quantities, so
     377              :         // they're probably fine as is
     378              : 
     379              :         /**
     380              :          * \brief Get the Fragment ID of this Fragment generator
     381              :          * \throws cet::exception("FragmentID") if there is more that one Fragment ID configured for this Fragment Generator
     382              :          * \return Fragment ID for the Fragment Generator
     383              :          */
     384              :         artdaq::Fragment::fragment_id_t fragment_id() const
     385              :         {
     386              :                 if (dataBuffers_.size() > 1) throw cet::exception("FragmentID") << "fragment_id() was called, indicating that Fragment Generator was expecting one and only one Fragment ID, but " << dataBuffers_.size() << " were declared!";
     387              :                 return (*dataBuffers_.begin()).first;
     388              :         }
     389              : 
     390              :         /**
     391              :          * \brief Routine used by applyRequests to make sure that all outstanding requests have been fulfilled before returning
     392              :          * \return The logical AND of should_stop, mode is not Ignored, and requests list size equal to 0
     393              :          */
     394              :         bool check_stop();
     395              : 
     396              :         /**
     397              :          * \brief Return the string representation of the current RequestMode
     398              :          * \return The string representation of the current RequestMode
     399              :          */
     400              :         std::string printMode_();
     401              : 
     402              :         /**
     403              :          * \brief Get the total number of Fragments in all data buffers
     404              :          * \return Number of Fragments in all data buffers
     405              :          */
     406              :         size_t dataBufferFragmentCount_();
     407              : 
     408              : private:
     409              :         // FHiCL-configurable variables. Note that the C++ variable names
     410              :         // are the FHiCL variable names with a "_" appended
     411              : 
     412              :         // Socket parameters
     413              :         Fragment::sequence_id_t next_sequence_id_;
     414              :         std::shared_ptr<RequestBuffer> requestBuffer_;
     415              : 
     416              :         RequestMode mode_;
     417              :         bool bufferModeKeepLatest_;
     418              :         Fragment::timestamp_t windowOffset_;
     419              :         Fragment::timestamp_t windowWidth_;
     420              :         Fragment::timestamp_t staleTimeout_;
     421              :         Fragment::type_t expectedType_;
     422              :         bool uniqueWindows_;
     423              :         bool sendMissingFragments_;
     424              :         size_t missing_request_window_timeout_us_;
     425              :         size_t window_close_timeout_us_;
     426              :         bool error_on_empty_;
     427              : 
     428              :         bool circularDataBufferMode_;
     429              : 
     430              :         std::mutex dataConditionMutex_;
     431              :         std::condition_variable dataCondition_;
     432              :         int maxDataBufferDepthFragments_;
     433              :         size_t maxDataBufferDepthBytes_;
     434              : 
     435              :         struct DataBuffer
     436              :         {
     437              :                 std::atomic<int> DataBufferDepthFragments;
     438              :                 std::atomic<size_t> DataBufferDepthBytes;
     439              :                 std::map<Fragment::sequence_id_t, std::chrono::steady_clock::time_point> WindowsSent;
     440              :                 bool BufferFragmentKept;
     441              :                 Fragment::sequence_id_t HighestRequestSeen;
     442              :                 FragmentPtrs DataBuffer;
     443              :                 std::mutex DataBufferMutex;
     444              :         };
     445              : 
     446              :         std::mutex systemFragmentMutex_;
     447              :         FragmentPtrs systemFragments_;
     448              :         std::atomic<size_t> systemFragmentCount_;
     449              : 
     450              :         std::unordered_map<artdaq::Fragment::fragment_id_t, std::shared_ptr<DataBuffer>> dataBuffers_;
     451              : 
     452              :         std::atomic<bool> should_stop_;
     453              : };
     454              : }  // namespace artdaq
     455              : 
     456              : #endif /* artdaq_Application_FragmentBuffer_hh */
        

Generated by: LCOV version 2.0-1