Line data Source code
1 : #ifndef _artdaq_DAQrate_RequestBuffer_hh_
2 : #define _artdaq_DAQrate_RequestBuffer_hh_
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include <condition_variable>
8 : #include <map>
9 : #include <set>
10 :
11 : namespace artdaq {
12 : /**
13 : * @brief Holds requests from RequestReceiver while they are being processed
14 : */
15 : class RequestBuffer
16 : {
17 : public:
18 : /**
19 : * @brief RequestBuffer Constructor
20 : * @param request_increment Expected increase in request sequence ID each request
21 : */
22 : explicit RequestBuffer(Fragment::sequence_id_t request_increment = 1);
23 :
24 : /**
25 : * @brief RequestBuffer Destructor
26 : */
27 : virtual ~RequestBuffer();
28 :
29 : /**
30 : * @brief Add a Request to the buffer
31 : * @param seq Sequence ID of the request
32 : * @param ts Timestamp for the request
33 : */
34 : void push(artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts);
35 :
36 : /**
37 : * @brief Reset RequestBuffer, discarding all requests and tracking information
38 : */
39 : void reset();
40 :
41 : /// <summary>
42 : /// Get the current requests
43 : /// </summary>
44 : /// <returns>Map relating sequence IDs to timestamps</returns>
45 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> GetRequests() const;
46 :
47 : /// <summary>
48 : /// Get the "next" request, i.e. the first unsatisfied request that has not already been returned by GetNextRequest
49 : /// </summary>
50 : /// <returns>Request data for "next" request. Will return (0,0) if there is no "next" request</returns>
51 : ///
52 : /// This function uses last_next_request_ to ensure that it does not return the same request more than once
53 : std::pair<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> GetNextRequest();
54 :
55 : /// <summary>
56 : /// Remove the request with the given sequence ID from the request map
57 : /// </summary>
58 : /// <param name="reqID">Request ID to remove</param>
59 : void RemoveRequest(artdaq::Fragment::sequence_id_t reqID);
60 :
61 : /// <summary>
62 : /// Clear all requests from the map
63 : /// </summary>
64 : void ClearRequests();
65 : /// <summary>
66 : /// Get the current requests, then clear the map
67 : /// </summary>
68 : /// <returns>Map relating sequence IDs to timestamps</returns>
69 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> GetAndClearRequests();
70 :
71 : /// <summary>
72 : /// Get the number of requests currently stored in the RequestReceiver
73 : /// </summary>
74 : /// <returns>The number of requests stored in the RequestReceiver</returns>
75 : size_t size();
76 : /// <summary>
77 : /// Wait for a new request message, up to the timeout given
78 : /// </summary>
79 : /// <param name="timeout_ms">Milliseconds to wait for a new request to arrive</param>
80 : /// <returns>True if any requests are present in the request map</returns>
81 : bool WaitForRequests(int timeout_ms);
82 :
83 : /// <summary>
84 : /// Get the time a given request was received
85 : /// </summary>
86 : /// <param name="reqID">Request ID of the request</param>
87 : /// <returns>steady_clock::time_point corresponding to when the request was received</returns>
88 : std::chrono::steady_clock::time_point GetRequestTime(artdaq::Fragment::sequence_id_t reqID);
89 :
90 : /**
91 : * @brief Determine whether the RequestBuffer is active
92 : * @return
93 : */
94 3 : bool isRunning() const { return receiver_running_; }
95 : /**
96 : * @brief Set whether the RequestBuffer is active
97 : * @param running Whether the RequestBuffer is active
98 : */
99 28 : void setRunning(bool running) { receiver_running_ = running; }
100 :
101 : private:
102 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> requests_;
103 : std::map<artdaq::Fragment::sequence_id_t, std::chrono::steady_clock::time_point> request_timing_;
104 : std::atomic<artdaq::Fragment::sequence_id_t> highest_seen_request_;
105 : std::atomic<artdaq::Fragment::sequence_id_t> last_next_request_; // The last request returned by GetNextRequest
106 : std::set<artdaq::Fragment::sequence_id_t> out_of_order_requests_;
107 : artdaq::Fragment::sequence_id_t request_increment_;
108 : mutable std::mutex request_mutex_;
109 : std::condition_variable request_cv_;
110 :
111 : std::atomic<bool> receiver_running_;
112 : };
113 : } // namespace artdaq
114 :
115 : #endif // _artdaq_DAQrate_RequestBuffer_hh_
|