Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_RequestBuffer").c_str() // include these 2 first -
3 :
4 : #include "artdaq/DAQrate/RequestBuffer.hh"
5 :
6 27 : artdaq::RequestBuffer::RequestBuffer(artdaq::Fragment::sequence_id_t request_increment)
7 :
8 27 : : requests_()
9 27 : , request_timing_()
10 27 : , highest_seen_request_(0)
11 27 : , last_next_request_(0)
12 27 : , out_of_order_requests_()
13 27 : , request_increment_(request_increment)
14 54 : , receiver_running_(false)
15 : {
16 27 : }
17 :
18 27 : artdaq::RequestBuffer::~RequestBuffer() {}
19 :
20 400062 : void artdaq::RequestBuffer::push(artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts)
21 : {
22 400062 : std::lock_guard<std::mutex> tlk(request_mutex_);
23 400062 : if (requests_.count(seq) && requests_[seq] != ts)
24 : {
25 0 : TLOG(TLVL_ERROR) << "Received conflicting request for SeqID "
26 0 : << seq << "!"
27 0 : << " Old ts=" << requests_[seq]
28 0 : << ", new ts=" << ts << ". Keeping OLD!";
29 : }
30 400062 : else if (!requests_.count(seq))
31 : {
32 400062 : int delta = seq - highest_seen_request_;
33 800124 : TLOG(TLVL_DEBUG + 36) << "Received request for sequence ID " << seq
34 400062 : << " and timestamp " << ts << " (delta: " << delta << ")";
35 400062 : if (delta <= 0 || out_of_order_requests_.count(seq))
36 : {
37 0 : TLOG(TLVL_DEBUG + 36) << "Already serviced this request ( sequence ID " << seq << ")! Ignoring...";
38 : }
39 : else
40 : {
41 400062 : requests_[seq] = ts;
42 400062 : request_timing_[seq] = std::chrono::steady_clock::now();
43 : }
44 : }
45 400062 : request_cv_.notify_all();
46 400062 : }
47 :
48 1 : void artdaq::RequestBuffer::reset()
49 : {
50 1 : std::lock_guard<std::mutex> lk(request_mutex_);
51 1 : requests_.clear();
52 1 : request_timing_.clear();
53 1 : highest_seen_request_ = 0;
54 1 : last_next_request_ = 0;
55 1 : out_of_order_requests_.clear();
56 1 : }
57 :
58 : /// <summary>
59 : /// Get the current requests
60 : /// </summary>
61 : /// <returns>Map relating sequence IDs to timestamps</returns>
62 :
63 79 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetRequests() const
64 : {
65 79 : std::lock_guard<std::mutex> lk(request_mutex_);
66 79 : std::map<artdaq::Fragment::sequence_id_t, Fragment::timestamp_t> out;
67 300151 : for (auto& in : requests_)
68 : {
69 300072 : out[in.first] = in.second;
70 : }
71 158 : return out;
72 79 : }
73 :
74 0 : std::pair<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetNextRequest()
75 : {
76 0 : std::lock_guard<std::mutex> lk(request_mutex_);
77 :
78 0 : auto it = requests_.begin();
79 0 : while (it != requests_.end() && it->first <= last_next_request_) { ++it; }
80 :
81 0 : if (it == requests_.end())
82 : {
83 0 : return std::make_pair<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t>(0, 0);
84 : }
85 :
86 0 : last_next_request_ = it->first;
87 0 : return *it;
88 0 : }
89 :
90 300076 : void artdaq::RequestBuffer::RemoveRequest(artdaq::Fragment::sequence_id_t reqID)
91 : {
92 600152 : TLOG(TLVL_DEBUG + 35) << "RemoveRequest: Removing request for id " << reqID;
93 300076 : std::lock_guard<std::mutex> lk(request_mutex_);
94 300076 : requests_.erase(reqID);
95 :
96 300076 : if (reqID > highest_seen_request_)
97 : {
98 600136 : TLOG(TLVL_DEBUG + 35) << "RemoveRequest: out_of_order_requests_.size() == " << out_of_order_requests_.size() << ", reqID=" << reqID << ", expected=" << highest_seen_request_ + request_increment_;
99 300068 : if (out_of_order_requests_.size() || reqID != highest_seen_request_ + request_increment_)
100 : {
101 32 : out_of_order_requests_.insert(reqID);
102 :
103 32 : auto it = out_of_order_requests_.begin();
104 46 : while (it != out_of_order_requests_.end()) // Stop accounting for requests after stop
105 : {
106 40 : if (*it == highest_seen_request_ + request_increment_)
107 : {
108 14 : highest_seen_request_ = *it;
109 14 : it = out_of_order_requests_.erase(it);
110 : }
111 : else
112 : {
113 26 : break;
114 : }
115 : }
116 : }
117 : else // no out-of-order requests and this request is highest seen + request_increment_
118 : {
119 300036 : highest_seen_request_ = reqID;
120 : }
121 600136 : TLOG(TLVL_DEBUG + 35) << "RemoveRequest: reqID=" << reqID << " Setting highest_seen_request_ to " << highest_seen_request_;
122 : }
123 300076 : if (metricMan && request_timing_.count(reqID))
124 : {
125 2100406 : metricMan->sendMetric("Request Response Time", TimeUtils::GetElapsedTime(request_timing_[reqID]), "seconds", 2, MetricMode::Average);
126 : }
127 300076 : request_timing_.erase(reqID);
128 300076 : }
129 :
130 : /// <summary>
131 : /// Clear all requests from the map
132 : /// </summary>
133 :
134 0 : void artdaq::RequestBuffer::ClearRequests()
135 : {
136 0 : std::lock_guard<std::mutex> lk(request_mutex_);
137 0 : requests_.clear();
138 0 : }
139 :
140 : /// <summary>
141 : /// Get the current requests, then clear the map
142 : /// </summary>
143 : /// <returns>Map relating sequence IDs to timestamps</returns>
144 :
145 0 : std::map<artdaq::Fragment::sequence_id_t, artdaq::Fragment::timestamp_t> artdaq::RequestBuffer::GetAndClearRequests()
146 : {
147 0 : std::lock_guard<std::mutex> lk(request_mutex_);
148 0 : std::map<artdaq::Fragment::sequence_id_t, Fragment::timestamp_t> out;
149 0 : for (auto& in : requests_)
150 : {
151 0 : out[in.first] = in.second;
152 : }
153 0 : if (requests_.size()) { highest_seen_request_ = requests_.rbegin()->first; }
154 0 : out_of_order_requests_.clear();
155 0 : requests_.clear();
156 0 : request_timing_.clear();
157 0 : return out;
158 0 : }
159 :
160 : /// <summary>
161 : /// Get the number of requests currently stored in the RequestReceiver
162 : /// </summary>
163 : /// <returns>The number of requests stored in the RequestReceiver</returns>
164 :
165 179 : size_t artdaq::RequestBuffer::size()
166 : {
167 179 : std::lock_guard<std::mutex> tlk(request_mutex_);
168 358 : return requests_.size();
169 179 : }
170 :
171 : /// <summary>
172 : /// Wait for a new request message, up to the timeout given
173 : /// </summary>
174 : /// <param name="timeout_ms">Milliseconds to wait for a new request to arrive</param>
175 : /// <returns>True if any requests are present in the request map</returns>
176 :
177 100 : bool artdaq::RequestBuffer::WaitForRequests(int timeout_ms)
178 : {
179 100 : std::unique_lock<std::mutex> lk(request_mutex_); // Lock needed by wait_for
180 : // See if we have to wait at all
181 100 : if (requests_.size() > 0) return true;
182 : // If we do have to wait, check requests_.size to make sure we're not being notified spuriously
183 300 : return request_cv_.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this]() { return requests_.size() > 0; });
184 100 : }
185 :
186 : /// <summary>
187 : /// Get the time a given request was received
188 : /// </summary>
189 : /// <param name="reqID">Request ID of the request</param>
190 : /// <returns>steady_clock::time_point corresponding to when the request was received</returns>
191 :
192 30 : std::chrono::steady_clock::time_point artdaq::RequestBuffer::GetRequestTime(artdaq::Fragment::sequence_id_t reqID)
193 : {
194 30 : std::lock_guard<std::mutex> lk(request_mutex_);
195 60 : return request_timing_.count(reqID) ? request_timing_[reqID] : std::chrono::steady_clock::now();
196 30 : }
|