Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_FragmentBuffer").c_str() // include these 2 first -
3 :
4 : #include "artdaq/DAQrate/FragmentBuffer.hh"
5 :
6 : #include <boost/exception/all.hpp>
7 : #include <boost/throw_exception.hpp>
8 :
9 : #include <iterator>
10 : #include <limits>
11 :
12 : #include "canvas/Utilities/Exception.h"
13 : #include "cetlib_except/exception.h"
14 : #include "fhiclcpp/ParameterSet.h"
15 :
16 : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
17 : #include "artdaq-core/Data/Fragment.hh"
18 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
19 : #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
20 : #include "artdaq-core/Utilities/TimeUtils.hh"
21 :
22 : #include <sys/poll.h>
23 : #include <algorithm>
24 : #include <fstream>
25 : #include <iomanip>
26 : #include <iostream>
27 : #include <iterator>
28 : #include "artdaq/DAQdata/TCPConnect.hh"
29 :
30 : #define TLVL_ADDFRAGMENT 32
31 : #define TLVL_CHECKSTOP 33
32 : #define TLVL_WAITFORBUFFERREADY 34
33 : #define TLVL_GETBUFFERSTATS 35
34 : #define TLVL_CHECKDATABUFFER 36
35 : #define TLVL_APPLYREQUESTS 37
36 : #define TLVL_APPLYREQUESTS_VERBOSE 38
37 : #define TLVL_SENDEMPTYFRAGMENTS 39
38 : #define TLVL_CHECKWINDOWS 40
39 : #define TLVL_EMPTYFRAGMENT 41
40 :
41 28 : artdaq::FragmentBuffer::FragmentBuffer(const fhicl::ParameterSet& ps)
42 28 : : next_sequence_id_(1)
43 28 : , requestBuffer_()
44 56 : , bufferModeKeepLatest_(ps.get<bool>("buffer_mode_keep_latest", false))
45 56 : , windowOffset_(ps.get<Fragment::timestamp_t>("request_window_offset", 0))
46 56 : , windowWidth_(ps.get<Fragment::timestamp_t>("request_window_width", 0))
47 56 : , staleTimeout_(ps.get<Fragment::timestamp_t>("stale_fragment_timeout", 0))
48 56 : , expectedType_(ps.get<Fragment::type_t>("expected_fragment_type", Fragment::type_t(Fragment::EmptyFragmentType)))
49 56 : , uniqueWindows_(ps.get<bool>("request_windows_are_unique", true))
50 56 : , sendMissingFragments_(ps.get<bool>("send_missing_request_fragments", true))
51 56 : , missing_request_window_timeout_us_(ps.get<size_t>("missing_request_window_timeout_us", 5000000))
52 56 : , window_close_timeout_us_(ps.get<size_t>("window_close_timeout_us", 2000000))
53 56 : , error_on_empty_(ps.get<bool>("error_on_empty_fragment", false))
54 56 : , circularDataBufferMode_(ps.get<bool>("circular_buffer_mode", false))
55 56 : , maxDataBufferDepthFragments_(ps.get<int>("data_buffer_depth_fragments", 1000))
56 56 : , maxDataBufferDepthBytes_(ps.get<size_t>("data_buffer_depth_mb", 1000) * 1024 * 1024)
57 28 : , systemFragmentCount_(0)
58 112 : , should_stop_(false)
59 : {
60 84 : auto fragment_ids = ps.get<std::vector<artdaq::Fragment::fragment_id_t>>("fragment_ids", std::vector<artdaq::Fragment::fragment_id_t>());
61 :
62 56 : TLOG(TLVL_DEBUG + 33) << "artdaq::FragmentBuffer::FragmentBuffer(ps)";
63 56 : int fragment_id = ps.get<int>("fragment_id", -99);
64 :
65 28 : if (fragment_id != -99)
66 : {
67 22 : if (fragment_ids.size() != 0)
68 : {
69 1 : auto report = "Error in FragmentBuffer: can't both define \"fragment_id\" and \"fragment_ids\" in FHiCL document";
70 3 : TLOG(TLVL_ERROR) << report;
71 3 : throw cet::exception("FragmentBufferConfig") << report;
72 : }
73 : else
74 : {
75 21 : fragment_ids.emplace_back(fragment_id);
76 : }
77 : }
78 :
79 66 : for (auto& id : fragment_ids)
80 : {
81 39 : dataBuffers_[id] = std::make_shared<DataBuffer>();
82 39 : dataBuffers_[id]->DataBufferDepthBytes = 0;
83 39 : dataBuffers_[id]->DataBufferDepthFragments = 0;
84 39 : dataBuffers_[id]->HighestRequestSeen = 0;
85 39 : dataBuffers_[id]->BufferFragmentKept = false;
86 : }
87 :
88 81 : std::string modeString = ps.get<std::string>("request_mode", "ignored");
89 27 : if (modeString == "single" || modeString == "Single")
90 : {
91 3 : mode_ = RequestMode::Single;
92 : }
93 24 : else if (modeString.find("buffer") != std::string::npos || modeString.find("Buffer") != std::string::npos)
94 : {
95 5 : mode_ = RequestMode::Buffer;
96 : }
97 19 : else if (modeString == "window" || modeString == "Window")
98 : {
99 14 : mode_ = RequestMode::Window;
100 : }
101 5 : else if (modeString.find("ignore") != std::string::npos || modeString.find("Ignore") != std::string::npos)
102 : {
103 3 : mode_ = RequestMode::Ignored;
104 : }
105 2 : else if (modeString.find("sequence") != std::string::npos || modeString.find("Sequence") != std::string::npos)
106 : {
107 2 : mode_ = RequestMode::SequenceID;
108 : }
109 75 : if (mode_ != RequestMode::Ignored && !ps.get<bool>("receive_requests", false))
110 : {
111 3 : TLOG(TLVL_WARNING) << "Request Mode was requested as " << modeString << ", but is being set to Ignored because \"receive_requests\" was not set to true";
112 1 : mode_ = RequestMode::Ignored;
113 : }
114 54 : TLOG(TLVL_DEBUG + 32) << "Request mode is " << printMode_();
115 32 : }
116 :
117 27 : artdaq::FragmentBuffer::~FragmentBuffer()
118 : {
119 81 : TLOG(TLVL_INFO) << "Fragment Buffer Destructor; Clearing data buffers";
120 27 : Reset(true);
121 27 : }
122 :
123 99490266 : void artdaq::FragmentBuffer::Reset(bool stop)
124 : {
125 99490266 : should_stop_ = stop;
126 99490266 : next_sequence_id_ = 1;
127 198980544 : for (auto& id : dataBuffers_)
128 : {
129 99490278 : std::lock_guard<std::mutex> dlk(id.second->DataBufferMutex);
130 99490278 : id.second->DataBufferDepthBytes = 0;
131 99490278 : id.second->DataBufferDepthFragments = 0;
132 99490278 : id.second->BufferFragmentKept = false;
133 99490278 : id.second->DataBuffer.clear();
134 99490278 : }
135 :
136 : {
137 99490266 : std::lock_guard<std::mutex> lk(systemFragmentMutex_);
138 99490266 : systemFragments_.clear();
139 99490266 : systemFragmentCount_ = 0;
140 99490266 : }
141 99490266 : }
142 :
143 300051 : void artdaq::FragmentBuffer::AddFragmentsToBuffer(FragmentPtrs frags)
144 : {
145 300051 : std::unordered_map<Fragment::fragment_id_t, FragmentPtrs> frags_by_id;
146 800264 : while (!frags.empty())
147 : {
148 500213 : auto dataIter = frags.begin();
149 500213 : auto frag_id = (*dataIter)->fragmentID();
150 :
151 500213 : if (Fragment::isBroadcastFragmentType((*dataIter)->type()))
152 : {
153 0 : std::lock_guard<std::mutex> lk(systemFragmentMutex_);
154 0 : systemFragments_.emplace_back(std::move(*dataIter));
155 0 : systemFragmentCount_++;
156 0 : frags.erase(dataIter);
157 0 : continue;
158 0 : }
159 :
160 500213 : if (!dataBuffers_.count(frag_id))
161 : {
162 0 : throw cet::exception("FragmentIDs") << "Received Fragment with Fragment ID " << frag_id << ", which is not in the declared Fragment IDs list!";
163 : }
164 :
165 500213 : frags_by_id[frag_id].emplace_back(std::move(*dataIter));
166 500213 : frags.erase(dataIter);
167 : }
168 :
169 300051 : auto type_it = frags_by_id.begin();
170 600133 : while (type_it != frags_by_id.end())
171 : {
172 300082 : auto frag_id = type_it->first;
173 :
174 300082 : waitForDataBufferReady(frag_id);
175 300082 : auto dataBuffer = dataBuffers_[frag_id];
176 300082 : std::lock_guard<std::mutex> dlk(dataBuffer->DataBufferMutex);
177 300082 : switch (mode_)
178 : {
179 14 : case RequestMode::Single: {
180 14 : auto dataIter = type_it->second.rbegin();
181 28 : TLOG(TLVL_ADDFRAGMENT) << "Adding Fragment with Fragment ID " << frag_id << ", Sequence ID " << (*dataIter)->sequenceID() << ", and Timestamp " << (*dataIter)->timestamp() << " to buffer";
182 14 : dataBuffer->DataBuffer.clear();
183 14 : dataBuffer->DataBufferDepthBytes = (*dataIter)->sizeBytes();
184 14 : dataBuffer->DataBuffer.emplace_back(std::move(*dataIter));
185 14 : dataBuffer->DataBufferDepthFragments = 1;
186 14 : type_it->second.clear();
187 : }
188 14 : break;
189 300068 : case RequestMode::Buffer:
190 : case RequestMode::Ignored:
191 : case RequestMode::Window:
192 : case RequestMode::SequenceID:
193 : default:
194 800262 : while (!type_it->second.empty())
195 : {
196 500194 : auto dataIter = type_it->second.begin();
197 1000388 : TLOG(TLVL_ADDFRAGMENT) << "Adding Fragment with Fragment ID " << frag_id << ", Sequence ID " << (*dataIter)->sequenceID() << ", and Timestamp " << (*dataIter)->timestamp() << " to buffer";
198 :
199 500194 : dataBuffer->DataBufferDepthBytes += (*dataIter)->sizeBytes();
200 500194 : dataBuffer->DataBuffer.emplace_back(std::move(*dataIter));
201 500194 : type_it->second.erase(dataIter);
202 : }
203 300068 : dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
204 300068 : break;
205 : }
206 300082 : getDataBufferStats(frag_id);
207 300082 : ++type_it;
208 300082 : }
209 300051 : dataCondition_.notify_all();
210 300051 : }
211 :
212 364 : bool artdaq::FragmentBuffer::check_stop()
213 : {
214 728 : TLOG(TLVL_CHECKSTOP) << "CFG::check_stop: should_stop=" << should_stop_.load();
215 :
216 364 : if (!should_stop_.load()) return false;
217 4 : if (mode_ == RequestMode::Ignored)
218 : {
219 1 : return true;
220 : }
221 :
222 3 : if (requestBuffer_ != nullptr)
223 : {
224 : // check_stop returns true if the CFG should stop. We should wait for the Request Buffer to report Request Receiver stopped before stopping.
225 6 : TLOG(TLVL_DEBUG + 32) << "should_stop is true, requestBuffer_->isRunning() is " << std::boolalpha << requestBuffer_->isRunning();
226 3 : if (!requestBuffer_->isRunning())
227 : {
228 1 : return true;
229 : }
230 : }
231 2 : return false;
232 : }
233 :
234 0 : std::string artdaq::FragmentBuffer::printMode_()
235 : {
236 0 : switch (mode_)
237 : {
238 0 : case RequestMode::Single:
239 0 : return "Single";
240 0 : case RequestMode::Buffer:
241 0 : return "Buffer";
242 0 : case RequestMode::Window:
243 0 : return "Window";
244 0 : case RequestMode::Ignored:
245 0 : return "Ignored";
246 0 : case RequestMode::SequenceID:
247 0 : return "SequenceID";
248 : }
249 :
250 0 : return "ERROR";
251 : }
252 :
253 302 : size_t artdaq::FragmentBuffer::dataBufferFragmentCount_()
254 : {
255 302 : size_t count = 0;
256 606 : for (auto& id : dataBuffers_) count += id.second->DataBufferDepthFragments;
257 302 : count += systemFragmentCount_.load();
258 302 : return count;
259 : }
260 :
261 300082 : bool artdaq::FragmentBuffer::waitForDataBufferReady(Fragment::fragment_id_t id)
262 : {
263 300082 : if (!dataBuffers_.count(id))
264 : {
265 0 : TLOG(TLVL_ERROR) << "DataBufferError: "
266 0 : << "Error in FragmentBuffer: Cannot wait for data buffer for ID " << id << " because it does not exist!";
267 0 : throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot wait for data buffer for ID " << id << " because it does not exist!";
268 : }
269 300082 : auto startwait = std::chrono::steady_clock::now();
270 300082 : auto first = true;
271 300082 : auto lastwaittime = 0ULL;
272 300082 : auto dataBuffer = dataBuffers_[id];
273 :
274 319687 : while (dataBufferIsTooLarge(id))
275 : {
276 19605 : if (!circularDataBufferMode_)
277 : {
278 19605 : if (should_stop_.load())
279 : {
280 0 : TLOG(TLVL_DEBUG + 32) << "Run ended while waiting for buffer to shrink!";
281 0 : getDataBufferStats(id);
282 0 : dataCondition_.notify_all();
283 0 : return false;
284 : }
285 19605 : auto waittime = TimeUtils::GetElapsedTimeMilliseconds(startwait);
286 :
287 19605 : if (first || (waittime != lastwaittime && waittime % 1000 == 0))
288 : {
289 19605 : std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
290 19605 : if (dataBufferIsTooLarge(id))
291 : {
292 47748 : TLOG(TLVL_WARNING) << "Bad Omen: Data Buffer has exceeded its size limits. "
293 15916 : << "(seq_id=" << next_sequence_id_ << ", frag_id=" << id
294 15916 : << ", frags=" << dataBuffer->DataBufferDepthFragments << "/" << maxDataBufferDepthFragments_
295 15916 : << ", szB=" << dataBuffer->DataBufferDepthBytes << "/" << maxDataBufferDepthBytes_ << ")"
296 31832 : << ", timestamps=" << dataBuffer->DataBuffer.front()->timestamp() << "-" << dataBuffer->DataBuffer.back()->timestamp();
297 31832 : TLOG(TLVL_DEBUG + 33) << "Bad Omen: Possible causes include requests not getting through or Ignored-mode BR issues";
298 :
299 15916 : if (metricMan)
300 : {
301 111412 : metricMan->sendMetric("Bad Omen wait time", waittime / 1000.0, "s", 1, MetricMode::LastPoint);
302 : }
303 : }
304 19605 : first = false;
305 19605 : }
306 19605 : if (waittime % 5 && waittime != lastwaittime)
307 : {
308 0 : TLOG(TLVL_WAITFORBUFFERREADY) << "getDataLoop: Data Retreival paused for " << waittime << " ms waiting for data buffer to drain";
309 : }
310 19605 : lastwaittime = waittime;
311 19605 : usleep(1000);
312 : }
313 : else
314 : {
315 0 : std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
316 0 : if (dataBufferIsTooLarge(id))
317 : {
318 0 : auto begin = dataBuffer->DataBuffer.begin();
319 0 : if (begin == dataBuffer->DataBuffer.end())
320 : {
321 0 : TLOG(TLVL_WARNING) << "Data buffer is reported as too large, but doesn't contain any Fragments! Possible corrupt memory!";
322 0 : continue;
323 0 : }
324 0 : if (*begin)
325 : {
326 0 : TLOG(TLVL_WAITFORBUFFERREADY) << "waitForDataBufferReady: Dropping Fragment with timestamp " << (*begin)->timestamp() << " from data buffer (Buffer over-size, circular data buffer mode)";
327 :
328 0 : dataBuffer->DataBufferDepthBytes -= (*begin)->sizeBytes();
329 0 : dataBuffer->DataBuffer.erase(begin);
330 0 : dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
331 0 : dataBuffer->BufferFragmentKept = false; // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
332 : }
333 : }
334 0 : }
335 : }
336 300082 : return true;
337 300082 : }
338 :
339 389444 : bool artdaq::FragmentBuffer::dataBufferIsTooLarge(Fragment::fragment_id_t id)
340 : {
341 389444 : if (!dataBuffers_.count(id))
342 : {
343 0 : TLOG(TLVL_ERROR) << "DataBufferError: "
344 0 : << "Error in FragmentBuffer: Cannot check size of data buffer for ID " << id << " because it does not exist!";
345 0 : throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot check size of data buffer for ID " << id << " because it does not exist!";
346 : }
347 389444 : auto dataBuffer = dataBuffers_[id];
348 1082746 : return (maxDataBufferDepthFragments_ > 0 && dataBuffer->DataBufferDepthFragments.load() > maxDataBufferDepthFragments_) ||
349 997160 : (maxDataBufferDepthBytes_ > 0 && dataBuffer->DataBufferDepthBytes.load() > maxDataBufferDepthBytes_);
350 389443 : }
351 :
352 300220 : void artdaq::FragmentBuffer::getDataBufferStats(Fragment::fragment_id_t id)
353 : {
354 300220 : if (!dataBuffers_.count(id))
355 : {
356 0 : TLOG(TLVL_ERROR) << "DataBufferError: "
357 0 : << "Error in FragmentBuffer: Cannot get stats of data buffer for ID " << id << " because it does not exist!";
358 0 : throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot get stats of data buffer for ID " << id << " because it does not exist!";
359 : }
360 300220 : auto dataBuffer = dataBuffers_[id];
361 :
362 300220 : if (metricMan)
363 : {
364 600440 : TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: Sending Metrics";
365 2101540 : metricMan->sendMetric("Buffer Depth Fragments", dataBuffer->DataBufferDepthFragments.load(), "fragments", 1, MetricMode::LastPoint);
366 2101540 : metricMan->sendMetric("Buffer Depth Bytes", dataBuffer->DataBufferDepthBytes.load(), "bytes", 1, MetricMode::LastPoint);
367 :
368 300220 : auto bufferDepthFragmentsPercent = dataBuffer->DataBufferDepthFragments.load() * 100 / static_cast<double>(maxDataBufferDepthFragments_);
369 300220 : auto bufferDepthBytesPercent = dataBuffer->DataBufferDepthBytes.load() * 100 / static_cast<double>(maxDataBufferDepthBytes_);
370 1801320 : metricMan->sendMetric("Fragment Buffer Full %Fragments", bufferDepthFragmentsPercent, "%", 3, MetricMode::LastPoint);
371 1801318 : metricMan->sendMetric("Fragment Buffer Full %Bytes", bufferDepthBytesPercent, "%", 3, MetricMode::LastPoint);
372 2101535 : metricMan->sendMetric("Fragment Buffer Full %", bufferDepthFragmentsPercent > bufferDepthBytesPercent ? bufferDepthFragmentsPercent : bufferDepthBytesPercent, "%", 1, MetricMode::LastPoint);
373 : }
374 600440 : TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: frags=" << dataBuffer->DataBufferDepthFragments.load() << "/" << maxDataBufferDepthFragments_
375 300220 : << ", sz=" << dataBuffer->DataBufferDepthBytes.load() << "/" << maxDataBufferDepthBytes_;
376 300220 : }
377 :
378 : //-----------------------------------------------------------------------------
379 : // P.Murat: return stat reports as a string
380 : //-----------------------------------------------------------------------------
381 0 : std::string artdaq::FragmentBuffer::getStatReport()
382 : {
383 0 : std::ostringstream oss;
384 :
385 0 : for (auto& it : dataBuffers_)
386 : {
387 0 : Fragment::fragment_id_t id = it.first;
388 0 : if (!dataBuffers_.count(id))
389 : {
390 0 : TLOG(TLVL_ERROR) << "DataBufferError: "
391 0 : << "Error in FragmentBuffer: Cannot get stats of data buffer for ID "
392 0 : << id << " because it does not exist!";
393 0 : throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot get stats of data buffer for ID "
394 0 : << id << " because it does not exist!";
395 : }
396 :
397 0 : auto dataBuffer = dataBuffers_[id];
398 :
399 0 : TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: Sending Metrics";
400 :
401 0 : int nf = dataBuffer->DataBufferDepthFragments.load();
402 0 : int nb = dataBuffer->DataBufferDepthBytes.load();
403 0 : oss << std::endl
404 0 : << "fragment_id:" << id << " nfragments:" << nf << " nbytes:" << nb
405 0 : << " max_nf:" << maxDataBufferDepthFragments_ << " max_nb:" << maxDataBufferDepthBytes_;
406 :
407 0 : TLOG(TLVL_GETBUFFERSTATS) << "getDataBufferStats: frags=" << dataBuffer->DataBufferDepthFragments.load() << "/" << maxDataBufferDepthFragments_
408 0 : << ", sz=" << dataBuffer->DataBufferDepthBytes.load() << "/" << maxDataBufferDepthBytes_;
409 0 : }
410 :
411 0 : return oss.str();
412 0 : }
413 :
414 231 : void artdaq::FragmentBuffer::checkDataBuffer(Fragment::fragment_id_t id)
415 : {
416 231 : if (!dataBuffers_.count(id))
417 : {
418 0 : TLOG(TLVL_ERROR) << "DataBufferError: "
419 0 : << "Error in FragmentBuffer: Cannot check data buffer for ID " << id << " because it does not exist!";
420 0 : throw cet::exception("DataBufferError") << "Error in FragmentBuffer: Cannot check data buffer for ID " << id << " because it does not exist!";
421 : }
422 :
423 231 : if (dataBuffers_[id]->DataBufferDepthFragments > 0 && mode_ != RequestMode::Single && mode_ != RequestMode::Ignored)
424 : {
425 88 : auto dataBuffer = dataBuffers_[id];
426 88 : std::lock_guard<std::mutex> lk(dataBuffer->DataBufferMutex);
427 :
428 : // Eliminate extra fragments
429 50152 : while (dataBufferIsTooLarge(id))
430 : {
431 50064 : auto begin = dataBuffer->DataBuffer.begin();
432 100128 : TLOG(TLVL_CHECKDATABUFFER) << "checkDataBuffer: Dropping Fragment with timestamp " << (*begin)->timestamp() << " from data buffer (Buffer over-size)";
433 50064 : dataBuffer->DataBufferDepthBytes -= (*begin)->sizeBytes();
434 50064 : dataBuffer->DataBuffer.erase(begin);
435 50064 : dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
436 50064 : dataBuffer->BufferFragmentKept = false; // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
437 : }
438 :
439 176 : TLOG(TLVL_CHECKDATABUFFER) << "DataBufferDepthFragments is " << dataBuffer->DataBufferDepthFragments << ", DataBuffer.size is " << dataBuffer->DataBuffer.size();
440 88 : if (dataBuffer->DataBufferDepthFragments > 0 && staleTimeout_ > 0)
441 : {
442 0 : TLOG(TLVL_CHECKDATABUFFER) << "Determining if Fragments can be dropped from data buffer";
443 0 : Fragment::timestamp_t last = dataBuffer->DataBuffer.back()->timestamp();
444 0 : Fragment::timestamp_t min = last > staleTimeout_ ? last - staleTimeout_ : 0;
445 0 : for (auto it = dataBuffer->DataBuffer.begin(); it != dataBuffer->DataBuffer.end();)
446 : {
447 0 : if ((*it)->timestamp() < min)
448 : {
449 0 : TLOG(TLVL_CHECKDATABUFFER) << "checkDataBuffer: Dropping Fragment with timestamp " << (*it)->timestamp() << " from data buffer (timeout=" << staleTimeout_ << ", min=" << min << ")";
450 0 : dataBuffer->DataBufferDepthBytes -= (*it)->sizeBytes();
451 0 : dataBuffer->BufferFragmentKept = false; // If any Fragments are removed from data buffer, then we know we don't have to ignore the first one anymore
452 0 : it = dataBuffer->DataBuffer.erase(it);
453 0 : dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
454 : }
455 : else
456 : {
457 0 : break;
458 : }
459 : }
460 : }
461 88 : }
462 231 : }
463 :
464 5 : void artdaq::FragmentBuffer::applyRequestsIgnoredMode(artdaq::FragmentPtrs& frags)
465 : {
466 : // dataBuffersMutex_ is held by calling function
467 : // We just copy everything that's here into the output.
468 10 : TLOG(TLVL_APPLYREQUESTS) << "Mode is Ignored; Copying data to output";
469 12 : for (auto& id : dataBuffers_)
470 : {
471 7 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
472 7 : if (id.second && !id.second->DataBuffer.empty() && id.second->DataBuffer.back()->sequenceID() >= next_sequence_id_)
473 : {
474 4 : next_sequence_id_ = id.second->DataBuffer.back()->sequenceID() + 1;
475 : }
476 7 : std::move(id.second->DataBuffer.begin(), id.second->DataBuffer.end(), std::inserter(frags, frags.end()));
477 7 : id.second->DataBufferDepthBytes = 0;
478 7 : id.second->DataBufferDepthFragments = 0;
479 7 : id.second->BufferFragmentKept = false;
480 7 : id.second->DataBuffer.clear();
481 7 : }
482 5 : }
483 :
484 11 : void artdaq::FragmentBuffer::applyRequestsSingleMode(artdaq::FragmentPtrs& frags)
485 : {
486 : // We only care about the latest request received. Send empties for all others.
487 11 : auto requests = requestBuffer_->GetRequests();
488 11 : while (requests.size() > 1)
489 : {
490 : // std::map is ordered by key => Last sequence ID in the map is the one we care about
491 0 : requestBuffer_->RemoveRequest(requests.begin()->first);
492 0 : requests.erase(requests.begin());
493 : }
494 11 : sendEmptyFragments(frags, requests);
495 :
496 : // If no requests remain after sendEmptyFragments, return
497 11 : if (requests.size() == 0 || !requests.count(next_sequence_id_)) return;
498 :
499 28 : for (auto& id : dataBuffers_)
500 : {
501 18 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
502 18 : if (id.second->DataBufferDepthFragments > 0)
503 : {
504 18 : assert(id.second->DataBufferDepthFragments == 1);
505 36 : TLOG(TLVL_APPLYREQUESTS) << "Mode is Single; Sending copy of last event (SeqID " << next_sequence_id_ << ")";
506 36 : for (auto& fragptr : id.second->DataBuffer)
507 : {
508 : // Return the latest data point
509 18 : auto frag = fragptr.get();
510 18 : auto newfrag = std::unique_ptr<artdaq::Fragment>(new Fragment(next_sequence_id_, frag->fragmentID()));
511 18 : newfrag->resize(frag->size() - detail::RawFragmentHeader::num_words());
512 18 : memcpy(newfrag->headerAddress(), frag->headerAddress(), frag->sizeBytes());
513 18 : newfrag->setTimestamp(requests[next_sequence_id_]);
514 18 : newfrag->setSequenceID(next_sequence_id_);
515 18 : frags.push_back(std::move(newfrag));
516 18 : }
517 : }
518 : else
519 : {
520 0 : sendEmptyFragment(frags, next_sequence_id_, id.first, "No data for");
521 : }
522 18 : }
523 10 : requestBuffer_->RemoveRequest(next_sequence_id_);
524 10 : ++next_sequence_id_;
525 11 : }
526 :
527 17 : void artdaq::FragmentBuffer::applyRequestsBufferMode(artdaq::FragmentPtrs& frags)
528 : {
529 : // We only care about the latest request received. Send empties for all others.
530 17 : auto requests = requestBuffer_->GetRequests();
531 17 : while (requests.size() > 1)
532 : {
533 : // std::map is ordered by key => Last sequence ID in the map is the one we care about
534 0 : requestBuffer_->RemoveRequest(requests.begin()->first);
535 0 : requests.erase(requests.begin());
536 : }
537 17 : sendEmptyFragments(frags, requests);
538 :
539 : // If no requests remain after sendEmptyFragments, return
540 17 : if (requests.size() == 0 || !requests.count(next_sequence_id_)) return;
541 :
542 48 : for (auto& id : dataBuffers_)
543 : {
544 62 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Creating ContainerFragment for Buffered Fragments (SeqID " << next_sequence_id_ << ")";
545 31 : frags.emplace_back(new artdaq::Fragment(next_sequence_id_, id.first));
546 31 : frags.back()->setTimestamp(requests[next_sequence_id_]);
547 31 : ContainerFragmentLoader cfl(*frags.back());
548 31 : cfl.set_missing_data(false); // Buffer mode is never missing data, even if there IS no data.
549 :
550 : // If we kept a Fragment from the previous iteration, but more data has arrived, discard it
551 31 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
552 31 : if (id.second->BufferFragmentKept && id.second->DataBufferDepthFragments > 1)
553 : {
554 1 : id.second->DataBufferDepthBytes -= id.second->DataBuffer.front()->sizeBytes();
555 1 : id.second->DataBuffer.erase(id.second->DataBuffer.begin());
556 1 : id.second->DataBufferDepthFragments = id.second->DataBuffer.size();
557 : }
558 :
559 : // Buffer mode TFGs should simply copy out the whole dataBuffer_ into a ContainerFragment
560 31 : FragmentPtrs fragsToAdd;
561 31 : std::move(id.second->DataBuffer.begin(), --id.second->DataBuffer.end(), std::back_inserter(fragsToAdd));
562 31 : id.second->DataBuffer.erase(id.second->DataBuffer.begin(), --id.second->DataBuffer.end());
563 :
564 31 : if (fragsToAdd.size() > 0)
565 : {
566 26 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Adding " << fragsToAdd.size() << " Fragments to Container (SeqID " << next_sequence_id_ << ")";
567 13 : cfl.addFragments(fragsToAdd);
568 : }
569 : else
570 : {
571 36 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: No Fragments to add (SeqID " << next_sequence_id_ << ")";
572 : }
573 :
574 31 : if (id.second->DataBuffer.size() == 1)
575 : {
576 46 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsBufferMode: Adding Fragment with timestamp " << id.second->DataBuffer.front()->timestamp() << " to Container with sequence ID " << next_sequence_id_;
577 23 : cfl.addFragment(id.second->DataBuffer.front());
578 23 : if (bufferModeKeepLatest_)
579 : {
580 3 : id.second->BufferFragmentKept = true;
581 3 : id.second->DataBufferDepthBytes = id.second->DataBuffer.front()->sizeBytes();
582 3 : id.second->DataBufferDepthFragments = id.second->DataBuffer.size(); // 1
583 : }
584 : else
585 : {
586 20 : id.second->DataBuffer.clear();
587 20 : id.second->BufferFragmentKept = false;
588 20 : id.second->DataBufferDepthBytes = 0;
589 20 : id.second->DataBufferDepthFragments = 0;
590 : }
591 : }
592 31 : }
593 17 : requestBuffer_->RemoveRequest(next_sequence_id_);
594 17 : ++next_sequence_id_;
595 17 : }
596 :
597 300053 : void artdaq::FragmentBuffer::applyRequestsWindowMode_CheckAndFillDataBuffer(artdaq::FragmentPtrs& frags, artdaq::Fragment::fragment_id_t id, artdaq::Fragment::sequence_id_t seq, artdaq::Fragment::timestamp_t ts)
598 : {
599 300053 : auto dataBuffer = dataBuffers_[id];
600 :
601 600106 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Checking that data exists for request window " << seq;
602 300053 : Fragment::timestamp_t min = ts > windowOffset_ ? ts - windowOffset_ : 0;
603 300053 : Fragment::timestamp_t max = ts + windowWidth_ > windowOffset_ ? ts + windowWidth_ - windowOffset_ : 1;
604 :
605 600106 : TLOG(TLVL_APPLYREQUESTS) << "ApplyRequestsWindowsMode_CheckAndFillDataBuffer: min is " << min << ", max is " << max
606 0 : << " and first/last points in buffer are " << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0)
607 0 : << "/" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0)
608 0 : << " (sz=" << dataBuffer->DataBufferDepthFragments << " [" << dataBuffer->DataBufferDepthBytes.load()
609 300053 : << "/" << maxDataBufferDepthBytes_ << "])";
610 300053 : bool windowClosed = dataBuffer->DataBufferDepthFragments > 0 && dataBuffer->DataBuffer.back()->timestamp() >= max;
611 300053 : bool windowTimeout = !windowClosed && TimeUtils::GetElapsedTimeMicroseconds(requestBuffer_->GetRequestTime(seq)) > window_close_timeout_us_;
612 300053 : if (windowTimeout)
613 : {
614 21 : TLOG(TLVL_WARNING) << "applyRequestsWindowMode_CheckAndFillDataBuffer: A timeout occurred waiting for data to close the request window ({" << min << "-" << max
615 7 : << "}, buffer={" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0) << "-"
616 7 : << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0)
617 7 : << "} ). Time waiting: "
618 14 : << TimeUtils::GetElapsedTimeMicroseconds(requestBuffer_->GetRequestTime(seq)) << " us "
619 14 : << "(> " << window_close_timeout_us_ << " us).";
620 : }
621 300053 : if (windowClosed || windowTimeout)
622 : {
623 600074 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Creating ContainerFragment for Window-requested Fragments (SeqID " << seq << ")";
624 300037 : frags.emplace_back(new artdaq::Fragment(seq, id));
625 300037 : frags.back()->setTimestamp(ts);
626 300037 : ContainerFragmentLoader cfl(*frags.back());
627 :
628 : // In the spirit of NOvA's MegaPool: (RS = Request start (min), RE = Request End (max))
629 : // --- | Buffer Start | --- | Buffer End | ---
630 : // 1. RS RE | | | |
631 : // 2. RS | | RE | |
632 : // 3. RS | | | | RE
633 : // 4. | | RS RE | |
634 : // 5. | | RS | | RE
635 : // 6. | | | | RS RE
636 : //
637 : // If RE (or RS) is after the end of the buffer, we wait for window_close_timeout_us_. If we're here, then that means that windowClosed is false, and the missing_data flag should be set.
638 : // If RS (or RE) is before the start of the buffer, then missing_data should be set to true, as data is assumed to arrive in the buffer in timestamp order
639 : // If the dataBuffer has size 0, then windowClosed will be false
640 300037 : if (!windowClosed || (dataBuffer->DataBufferDepthFragments > 0 && dataBuffer->DataBuffer.front()->timestamp() > min))
641 : {
642 100028 : TLOG(TLVL_DEBUG + 32) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Request window starts before and/or ends after the current data buffer, setting ContainerFragment's missing_data flag!"
643 0 : << " (requestWindowRange=[" << min << "," << max << "], "
644 0 : << "buffer={" << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.front()->timestamp() : 0) << "-"
645 50014 : << (dataBuffer->DataBufferDepthFragments > 0 ? dataBuffer->DataBuffer.back()->timestamp() : 0) << "} (SeqID " << seq << ")";
646 50014 : cfl.set_missing_data(true);
647 : }
648 :
649 300037 : auto it = dataBuffer->DataBuffer.begin();
650 : // Likely that it will be closer to the end...
651 300037 : if (windowTimeout)
652 : {
653 7 : it = dataBuffer->DataBuffer.end();
654 7 : --it;
655 8 : while (it != dataBuffer->DataBuffer.begin())
656 : {
657 2 : if ((*it)->timestamp() < min)
658 : {
659 1 : break;
660 : }
661 1 : --it;
662 : }
663 : }
664 :
665 300037 : FragmentPtrs fragsToAdd;
666 : // Do a little bit more work to decide which fragments to send for a given request
667 550109 : for (; it != dataBuffer->DataBuffer.end();)
668 : {
669 550088 : Fragment::timestamp_t fragT = (*it)->timestamp();
670 550088 : if (fragT < min)
671 : {
672 34 : ++it;
673 34 : continue;
674 : }
675 550054 : if (fragT > max || (fragT == max && windowWidth_ > 0))
676 : {
677 : break;
678 : }
679 :
680 500076 : TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Adding Fragment with timestamp " << (*it)->timestamp() << " to Container (SeqID " << seq << ")";
681 250038 : if (uniqueWindows_)
682 : {
683 250038 : dataBuffer->DataBufferDepthBytes -= (*it)->sizeBytes();
684 250038 : fragsToAdd.emplace_back(std::move(*it));
685 250038 : it = dataBuffer->DataBuffer.erase(it);
686 : }
687 : else
688 : {
689 0 : fragsToAdd.emplace_back(it->get());
690 0 : ++it;
691 : }
692 : }
693 :
694 300037 : if (fragsToAdd.size() > 0)
695 : {
696 500054 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: Adding " << fragsToAdd.size() << " Fragments to Container (SeqID " << seq << ")";
697 250027 : cfl.addFragments(fragsToAdd);
698 :
699 : // Don't delete Fragments which are still in the Fragment buffer
700 250027 : if (!uniqueWindows_)
701 : {
702 0 : for (auto& frag : fragsToAdd)
703 : {
704 0 : frag.release();
705 : }
706 : }
707 250027 : fragsToAdd.clear();
708 : }
709 : else
710 : {
711 100020 : TLOG(error_on_empty_ ? TLVL_ERROR : TLVL_APPLYREQUESTS) << "applyRequestsWindowMode_CheckAndFillDataBuffer: No Fragments match request (SeqID " << seq << ", window " << min << " - " << max << ")";
712 : }
713 :
714 300037 : dataBuffer->DataBufferDepthFragments = dataBuffer->DataBuffer.size();
715 300037 : dataBuffer->WindowsSent[seq] = std::chrono::steady_clock::now();
716 300037 : if (seq > dataBuffer->HighestRequestSeen) dataBuffer->HighestRequestSeen = seq;
717 300037 : }
718 300053 : }
719 :
720 41 : void artdaq::FragmentBuffer::applyRequestsWindowMode(artdaq::FragmentPtrs& frags)
721 : {
722 82 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode BEGIN";
723 :
724 41 : auto requests = requestBuffer_->GetRequests();
725 :
726 82 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode: Starting request processing for " << requests.size() << " requests";
727 300076 : for (auto req = requests.begin(); req != requests.end();)
728 : {
729 600070 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsWindowMode: processing request with sequence ID " << req->first << ", timestamp " << req->second;
730 :
731 300035 : while (req->first < next_sequence_id_ && requests.size() > 0)
732 : {
733 0 : TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsWindowMode: Clearing passed request for sequence ID " << req->first;
734 0 : requestBuffer_->RemoveRequest(req->first);
735 0 : req = requests.erase(req);
736 : }
737 300035 : if (requests.size() == 0) break;
738 :
739 300035 : auto ts = req->second;
740 300035 : if (ts == Fragment::InvalidTimestamp)
741 : {
742 0 : TLOG(TLVL_ERROR) << "applyRequestsWindowMode: Received InvalidTimestamp in request " << req->first << ", cannot apply! Check that push-mode BRs are filling appropriate timestamps in their Fragments!";
743 0 : req = requests.erase(req);
744 0 : continue;
745 0 : }
746 :
747 600090 : for (auto& id : dataBuffers_)
748 : {
749 300055 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
750 300055 : if (!id.second->WindowsSent.count(req->first))
751 : {
752 300053 : applyRequestsWindowMode_CheckAndFillDataBuffer(frags, id.first, req->first, req->second);
753 : }
754 300055 : }
755 300035 : checkSentWindows(req->first);
756 300035 : ++req;
757 : }
758 :
759 : // Check sent windows for requests that can be removed
760 41 : std::set<artdaq::Fragment::sequence_id_t> seqs;
761 102 : for (auto& id : dataBuffers_)
762 : {
763 61 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
764 83 : for (auto& seq : id.second->WindowsSent)
765 : {
766 22 : seqs.insert(seq.first);
767 : }
768 61 : }
769 52 : for (auto& seq : seqs)
770 : {
771 11 : checkSentWindows(seq);
772 : }
773 41 : }
774 :
775 10 : void artdaq::FragmentBuffer::applyRequestsSequenceIDMode(artdaq::FragmentPtrs& frags)
776 : {
777 20 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode BEGIN";
778 :
779 10 : auto requests = requestBuffer_->GetRequests();
780 :
781 20 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode: Starting request processing";
782 20 : for (auto req = requests.begin(); req != requests.end();)
783 : {
784 20 : TLOG(TLVL_APPLYREQUESTS) << "applyRequestsSequenceIDMode: Checking that data exists for request SequenceID " << req->first;
785 :
786 30 : for (auto& id : dataBuffers_)
787 : {
788 20 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
789 20 : if (!id.second->WindowsSent.count(req->first))
790 : {
791 40 : TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "Searching id " << id.first << " for Fragments with Sequence ID " << req->first;
792 40 : for (auto it = id.second->DataBuffer.begin(); it != id.second->DataBuffer.end();)
793 : {
794 20 : auto seq = (*it)->sequenceID();
795 40 : TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsSequenceIDMode: Fragment SeqID " << seq << ", request ID " << req->first;
796 20 : if (seq == req->first)
797 : {
798 32 : TLOG(TLVL_APPLYREQUESTS_VERBOSE) << "applyRequestsSequenceIDMode: Adding Fragment to output";
799 16 : id.second->WindowsSent[req->first] = std::chrono::steady_clock::now();
800 16 : id.second->DataBufferDepthBytes -= (*it)->sizeBytes();
801 16 : frags.push_back(std::move(*it));
802 16 : it = id.second->DataBuffer.erase(it);
803 16 : id.second->DataBufferDepthFragments = id.second->DataBuffer.size();
804 : }
805 : else
806 : {
807 4 : ++it;
808 : }
809 : }
810 : }
811 20 : if (req->first > id.second->HighestRequestSeen) id.second->HighestRequestSeen = req->first;
812 20 : }
813 10 : checkSentWindows(req->first);
814 10 : ++req;
815 : }
816 :
817 : // Check sent windows for requests that can be removed
818 10 : std::set<artdaq::Fragment::sequence_id_t> seqs;
819 30 : for (auto& id : dataBuffers_)
820 : {
821 20 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
822 28 : for (auto& seq : id.second->WindowsSent)
823 : {
824 8 : seqs.insert(seq.first);
825 : }
826 20 : }
827 14 : for (auto& seq : seqs)
828 : {
829 4 : checkSentWindows(seq);
830 : }
831 10 : }
832 :
833 86 : bool artdaq::FragmentBuffer::applyRequests(artdaq::FragmentPtrs& frags)
834 : {
835 86 : if (check_stop())
836 : {
837 2 : return false;
838 : }
839 :
840 : // Wait for data, if in ignored mode, or a request otherwise
841 84 : if (mode_ == RequestMode::Ignored)
842 : {
843 5 : auto start_time = std::chrono::steady_clock::now();
844 104 : while (dataBufferFragmentCount_() == 0 && TimeUtils::GetElapsedTime(start_time) < 1.0)
845 : {
846 99 : if (check_stop()) return false;
847 99 : std::unique_lock<std::mutex> lock(dataConditionMutex_);
848 297 : dataCondition_.wait_for(lock, std::chrono::milliseconds(10), [this]() { return dataBufferFragmentCount_() > 0; });
849 99 : }
850 : }
851 79 : else if (requestBuffer_ == nullptr)
852 : {
853 0 : TLOG(TLVL_ERROR) << "Request Buffer must be set (via SetRequestBuffer) before applyRequests/getData can be called!";
854 0 : return false;
855 : }
856 : else
857 : {
858 79 : if ((check_stop() && requestBuffer_->size() == 0)) return false;
859 :
860 79 : std::unique_lock<std::mutex> lock(dataConditionMutex_);
861 79 : dataCondition_.wait_for(lock, std::chrono::milliseconds(10));
862 :
863 79 : checkDataBuffers();
864 :
865 : // Wait up to 1000 ms for a request...
866 79 : auto counter = 0;
867 :
868 179 : while (requestBuffer_->size() == 0 && counter < 100)
869 : {
870 100 : if (check_stop()) return false;
871 :
872 100 : checkDataBuffers();
873 :
874 100 : requestBuffer_->WaitForRequests(10); // milliseconds
875 100 : counter++;
876 : }
877 79 : }
878 :
879 168 : if (systemFragmentCount_.load() > 0)
880 : {
881 0 : std::lock_guard<std::mutex> lk(systemFragmentMutex_);
882 0 : TLOG(TLVL_INFO) << "Copying " << systemFragmentCount_.load() << " System Fragments into output";
883 :
884 0 : std::move(systemFragments_.begin(), systemFragments_.end(), std::inserter(frags, frags.end()));
885 0 : systemFragments_.clear();
886 0 : systemFragmentCount_ = 0;
887 0 : }
888 :
889 84 : switch (mode_)
890 : {
891 11 : case RequestMode::Single:
892 11 : applyRequestsSingleMode(frags);
893 11 : break;
894 41 : case RequestMode::Window:
895 41 : applyRequestsWindowMode(frags);
896 41 : break;
897 17 : case RequestMode::Buffer:
898 17 : applyRequestsBufferMode(frags);
899 17 : break;
900 10 : case RequestMode::SequenceID:
901 10 : applyRequestsSequenceIDMode(frags);
902 10 : break;
903 5 : case RequestMode::Ignored:
904 : default:
905 5 : applyRequestsIgnoredMode(frags);
906 5 : break;
907 : }
908 :
909 84 : getDataBuffersStats();
910 :
911 84 : if (frags.size() > 0)
912 138 : TLOG(TLVL_APPLYREQUESTS) << "Finished Processing requests, returning " << frags.size() << " fragments, current ev_counter is " << next_sequence_id_;
913 84 : return true;
914 : }
915 :
916 13 : bool artdaq::FragmentBuffer::sendEmptyFragment(artdaq::FragmentPtrs& frags, size_t seqId, Fragment::fragment_id_t fragmentId, std::string desc)
917 : {
918 26 : TLOG(TLVL_EMPTYFRAGMENT) << desc << " sequence ID " << seqId << ", sending empty fragment";
919 13 : auto frag = new Fragment();
920 13 : frag->setSequenceID(seqId);
921 13 : frag->setFragmentID(fragmentId);
922 13 : frag->setSystemType(Fragment::EmptyFragmentType);
923 13 : frags.emplace_back(FragmentPtr(frag));
924 13 : return true;
925 : }
926 :
927 28 : void artdaq::FragmentBuffer::sendEmptyFragments(artdaq::FragmentPtrs& frags, std::map<Fragment::sequence_id_t, Fragment::timestamp_t>& requests)
928 : {
929 28 : if (requests.size() > 0)
930 : {
931 54 : TLOG(TLVL_SENDEMPTYFRAGMENTS) << "Sending Empty Fragments for Sequence IDs from " << next_sequence_id_ << " up to but not including " << requests.begin()->first;
932 34 : while (requests.begin()->first > next_sequence_id_)
933 : {
934 7 : if (sendMissingFragments_)
935 : {
936 20 : for (auto& fid : dataBuffers_)
937 : {
938 26 : sendEmptyFragment(frags, next_sequence_id_, fid.first, "Missed request for");
939 : }
940 : }
941 7 : ++next_sequence_id_;
942 : }
943 : }
944 28 : }
945 :
946 300060 : void artdaq::FragmentBuffer::checkSentWindows(artdaq::Fragment::sequence_id_t seq)
947 : {
948 600120 : TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Checking if request " << seq << " can be removed from request list";
949 300060 : bool seqComplete = true;
950 300060 : bool seqTimeout = false;
951 600166 : for (auto& id : dataBuffers_)
952 : {
953 300106 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
954 300106 : if (!id.second->WindowsSent.count(seq) || id.second->HighestRequestSeen < seq)
955 : {
956 21 : seqComplete = false;
957 : }
958 300106 : if (id.second->WindowsSent.count(seq) && TimeUtils::GetElapsedTimeMicroseconds(id.second->WindowsSent[seq]) > missing_request_window_timeout_us_)
959 : {
960 4 : seqTimeout = true;
961 : }
962 300106 : }
963 300060 : if (seqComplete)
964 : {
965 600090 : TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Request " << seq << " is complete, removing from requestBuffer_.";
966 300045 : requestBuffer_->RemoveRequest(seq);
967 :
968 300045 : if (next_sequence_id_ == seq)
969 : {
970 600058 : TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Sequence ID matches ev_counter, incrementing ev_counter (" << next_sequence_id_ << ")";
971 :
972 600078 : for (auto& id : dataBuffers_)
973 : {
974 300049 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
975 300049 : id.second->WindowsSent.erase(seq);
976 300049 : }
977 :
978 300029 : ++next_sequence_id_;
979 : }
980 : }
981 300060 : if (seqTimeout)
982 : {
983 4 : TLOG(TLVL_CHECKWINDOWS) << "checkSentWindows: Sent Window history indicates that requests between " << next_sequence_id_ << " and " << seq << " have timed out.";
984 6 : while (next_sequence_id_ <= seq)
985 : {
986 6 : if (next_sequence_id_ < seq) TLOG(TLVL_CHECKWINDOWS) << "Missed request for sequence ID " << next_sequence_id_ << "! Will not send any data for this sequence ID!";
987 4 : requestBuffer_->RemoveRequest(next_sequence_id_);
988 :
989 12 : for (auto& id : dataBuffers_)
990 : {
991 8 : std::lock_guard<std::mutex> lk(id.second->DataBufferMutex);
992 8 : id.second->WindowsSent.erase(next_sequence_id_);
993 8 : }
994 :
995 4 : ++next_sequence_id_;
996 : }
997 : }
998 300060 : }
|