Line data Source code
1 : #define TRACE_NAME "FragmentBuffer_t"
2 :
3 : #define BOOST_TEST_MODULE FragmentBuffer_t
4 : #include <boost/test/unit_test.hpp>
5 :
6 : #include "artdaq-core/Data/ContainerFragment.hh"
7 : #include "artdaq-core/Data/Fragment.hh"
8 : #include "artdaq-core/Utilities/configureMessageFacility.hh"
9 : #include "artdaq/DAQrate/FragmentBuffer.hh"
10 : #include "artdaq/DAQrate/detail/RequestSender.hh"
11 :
12 : #include <thread>
13 :
14 : #define MESSAGEFACILITY_DEBUG true
15 :
16 : #define RATE_TEST_COUNT 100000
17 : #define TRACE_REQUIRE_EQUAL(l, r) \
18 : do \
19 : { \
20 : if (l == r) \
21 : { \
22 : TLOG(TLVL_DEBUG) << __LINE__ << ": Checking if " << #l << " (" << l << ") equals " << #r << " (" << r << ")...YES!"; \
23 : } \
24 : else \
25 : { \
26 : TLOG(TLVL_ERROR) << __LINE__ << ": Checking if " << #l << " (" << l << ") equals " << #r << " (" << r << ")...NO!"; \
27 : } \
28 : BOOST_REQUIRE_EQUAL(l, r); \
29 : } while (0)
30 :
31 : namespace artdaqtest {
32 : class FragmentBufferTestGenerator;
33 : }
34 :
35 : /**
36 : * \brief CommandableFragmentGenerator derived class for testing
37 : */
38 : class artdaqtest::FragmentBufferTestGenerator
39 : {
40 : public:
41 : /**
42 : * \brief FragmentBufferTestGenerator Constructor
43 : */
44 : explicit FragmentBufferTestGenerator(const fhicl::ParameterSet& ps);
45 :
46 : /**
47 : * @brief Generate Fragments
48 : * @param n Number of Fragments to generate
49 : * @param fragmentIds List of Fragment IDs to generate Fragments for (if different than configured fragment IDs)
50 : * @return artdaq::FragmentPtrs containing generated Fragments
51 : */
52 : artdaq::FragmentPtrs Generate(size_t n, std::vector<artdaq::Fragment::fragment_id_t> fragmentIds = std::vector<artdaq::Fragment::fragment_id_t>());
53 :
54 : public:
55 : /**
56 : * \brief Set the timestamp to be used for the next Fragment
57 : * \param ts Timestamp to be used for the next Fragment
58 : */
59 1 : void setTimestamp(artdaq::Fragment::timestamp_t ts) { ts_ = ts; }
60 :
61 : /**
62 : * \brief Get the timestamp that will be used for the next Fragment
63 : * \return The timestamp that will be used for the next Fragment
64 : */
65 : artdaq::Fragment::timestamp_t getTimestamp() { return ts_; }
66 :
67 : private:
68 : artdaq::Fragment::timestamp_t ts_;
69 : artdaq::Fragment::sequence_id_t seq_;
70 : std::set<artdaq::Fragment::fragment_id_t> fragmentIDs_;
71 : };
72 :
73 25 : artdaqtest::FragmentBufferTestGenerator::FragmentBufferTestGenerator(const fhicl::ParameterSet& ps)
74 25 : : ts_(0), seq_(1)
75 : {
76 100 : metricMan->initialize(ps.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()));
77 25 : metricMan->do_start();
78 :
79 150 : auto idlist = ps.get<std::vector<artdaq::Fragment::fragment_id_t>>("fragment_ids", {ps.get<artdaq::Fragment::fragment_id_t>("fragment_id", 1)});
80 62 : for (auto& id : idlist)
81 : {
82 37 : fragmentIDs_.insert(id);
83 : }
84 25 : }
85 :
86 300051 : artdaq::FragmentPtrs artdaqtest::FragmentBufferTestGenerator::Generate(size_t n, std::vector<artdaq::Fragment::fragment_id_t> fragmentIds)
87 : {
88 300051 : if (fragmentIds.size() == 0) std::copy(fragmentIDs_.begin(), fragmentIDs_.end(), std::back_inserter(fragmentIds));
89 :
90 300051 : artdaq::FragmentPtrs frags;
91 800193 : while (n > 0)
92 : {
93 500142 : ++ts_;
94 1000355 : for (auto& id : fragmentIds)
95 : {
96 1500639 : TLOG(TLVL_DEBUG) << "Adding Fragment with ID " << id << ", SeqID " << seq_ << ", and timestamp " << ts_;
97 500213 : frags.emplace_back(new artdaq::Fragment(seq_, id, artdaq::Fragment::FirstUserFragmentType, ts_));
98 : }
99 500142 : ++seq_;
100 500142 : n--;
101 : }
102 :
103 300051 : return frags;
104 0 : }
105 :
106 : BOOST_AUTO_TEST_SUITE(FragmentBuffer_t)
107 :
108 2 : BOOST_AUTO_TEST_CASE(ImproperConfiguration)
109 : {
110 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
111 3 : TLOG(TLVL_INFO) << "ImproperConfiguration test case BEGIN";
112 1 : fhicl::ParameterSet ps;
113 2 : ps.put<int>("fragment_id", 1);
114 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
115 3 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
116 3 : ps.put<std::string>("request_mode", "window");
117 :
118 1 : artdaq::FragmentBuffer fp(ps);
119 1 : BOOST_REQUIRE_EQUAL(static_cast<int>(fp.request_mode()), static_cast<int>(artdaq::RequestMode::Ignored));
120 :
121 2 : ps.put<bool>("receive_requests", true);
122 1 : artdaq::FragmentBuffer fpp(ps);
123 1 : BOOST_REQUIRE_EQUAL(static_cast<int>(fpp.request_mode()), static_cast<int>(artdaq::RequestMode::Window));
124 :
125 5 : ps.put<std::vector<int>>("fragment_ids", {2, 3, 4});
126 3 : BOOST_REQUIRE_EXCEPTION(artdaq::FragmentBuffer ffp(ps), cet::exception, [](cet::exception const& e) { return e.category() == "FragmentBufferConfig"; });
127 :
128 3 : TLOG(TLVL_INFO) << "ImproperConfiguration test case END";
129 1 : }
130 :
131 2 : BOOST_AUTO_TEST_CASE(IgnoreRequests)
132 : {
133 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
134 3 : TLOG(TLVL_INFO) << "IgnoreRequests test case BEGIN";
135 1 : fhicl::ParameterSet ps;
136 2 : ps.put<int>("fragment_id", 1);
137 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
138 3 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
139 3 : ps.put<std::string>("request_mode", "ignored");
140 :
141 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
142 1 : buffer->setRunning(true);
143 :
144 1 : artdaq::FragmentBuffer fp(ps);
145 1 : fp.SetRequestBuffer(buffer);
146 :
147 1 : buffer->push(53, 35);
148 :
149 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
150 :
151 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
152 :
153 1 : artdaq::FragmentPtrs fps;
154 1 : auto sts = fp.applyRequests(fps);
155 :
156 4 : TRACE_REQUIRE_EQUAL(sts, true);
157 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
158 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
159 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
160 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
161 :
162 3 : TLOG(TLVL_INFO) << "IgnoreRequests test case END";
163 1 : }
164 :
165 2 : BOOST_AUTO_TEST_CASE(SingleMode)
166 : {
167 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
168 3 : TLOG(TLVL_INFO) << "SingleMode test case BEGIN";
169 1 : fhicl::ParameterSet ps;
170 2 : ps.put<int>("fragment_id", 1);
171 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
172 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
173 3 : ps.put<bool>("receive_requests", true);
174 3 : ps.put<std::string>("request_mode", "single");
175 :
176 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
177 1 : buffer->setRunning(true);
178 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
179 1 : artdaq::FragmentBuffer fp(ps);
180 1 : fp.SetRequestBuffer(buffer);
181 :
182 1 : buffer->push(1, 1);
183 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
184 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
185 :
186 1 : artdaq::FragmentPtrs fps;
187 1 : auto sts = fp.applyRequests(fps);
188 1 : auto type = artdaq::Fragment::FirstUserFragmentType;
189 4 : TRACE_REQUIRE_EQUAL(sts, true);
190 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
191 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
192 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
193 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
194 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
195 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
196 1 : fps.clear();
197 :
198 1 : buffer->push(2, 5);
199 1 : sts = fp.applyRequests(fps);
200 4 : TRACE_REQUIRE_EQUAL(sts, true);
201 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
202 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
203 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
204 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
205 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
206 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
207 1 : fps.clear();
208 :
209 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
210 1 : buffer->push(4, 7);
211 :
212 1 : sts = fp.applyRequests(fps);
213 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
214 4 : TRACE_REQUIRE_EQUAL(sts, true);
215 4 : TRACE_REQUIRE_EQUAL(fps.size(), 2);
216 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
217 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
218 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
219 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
220 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
221 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
222 1 : fps.pop_front();
223 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
224 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
225 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
226 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
227 1 : fps.clear();
228 :
229 3 : TLOG(TLVL_INFO) << "SingleMode test case END";
230 1 : }
231 :
232 2 : BOOST_AUTO_TEST_CASE(BufferMode)
233 : {
234 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
235 3 : TLOG(TLVL_INFO) << "BufferMode test case BEGIN";
236 1 : fhicl::ParameterSet ps;
237 2 : ps.put<int>("fragment_id", 1);
238 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
239 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
240 3 : ps.put<bool>("receive_requests", true);
241 3 : ps.put<std::string>("request_mode", "buffer");
242 :
243 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
244 1 : buffer->setRunning(true);
245 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
246 1 : artdaq::FragmentBuffer fp(ps);
247 1 : fp.SetRequestBuffer(buffer);
248 :
249 1 : buffer->push(1, 1);
250 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
251 :
252 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
253 :
254 1 : artdaq::FragmentPtrs fps;
255 1 : auto sts = fp.applyRequests(fps);
256 4 : TRACE_REQUIRE_EQUAL(sts, true);
257 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
258 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
259 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
260 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
261 1 : auto type = artdaq::Fragment::ContainerFragmentType;
262 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
263 1 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
264 1 : auto cf = artdaq::ContainerFragment(*fps.front());
265 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
266 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
267 1 : type = artdaq::Fragment::FirstUserFragmentType;
268 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
269 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
270 1 : fps.clear();
271 :
272 1 : buffer->push(2, 5);
273 1 : sts = fp.applyRequests(fps);
274 4 : TRACE_REQUIRE_EQUAL(sts, true);
275 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
276 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
277 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
278 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
279 1 : type = artdaq::Fragment::ContainerFragmentType;
280 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
281 1 : auto cf2 = artdaq::ContainerFragment(*fps.front());
282 4 : TRACE_REQUIRE_EQUAL(cf2.block_count(), 0);
283 4 : TRACE_REQUIRE_EQUAL(cf2.missing_data(), false);
284 1 : type = artdaq::Fragment::EmptyFragmentType;
285 4 : TRACE_REQUIRE_EQUAL(cf2.fragment_type(), type);
286 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
287 1 : fps.clear();
288 :
289 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
290 1 : buffer->push(4, 7);
291 :
292 1 : sts = fp.applyRequests(fps);
293 4 : TRACE_REQUIRE_EQUAL(sts, true);
294 4 : TRACE_REQUIRE_EQUAL(fps.size(), 2);
295 :
296 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
297 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
298 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
299 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
300 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
301 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
302 4 : TRACE_REQUIRE_EQUAL(fps.front()->size(), artdaq::detail::RawFragmentHeader::num_words());
303 1 : fps.pop_front();
304 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
305 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
306 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
307 1 : type = artdaq::Fragment::ContainerFragmentType;
308 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
309 1 : auto cf3 = artdaq::ContainerFragment(*fps.front());
310 4 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 2);
311 4 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), false);
312 1 : type = artdaq::Fragment::FirstUserFragmentType;
313 4 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
314 1 : fps.clear();
315 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
316 :
317 3 : TLOG(TLVL_INFO) << "BufferMode test case END";
318 1 : }
319 :
320 2 : BOOST_AUTO_TEST_CASE(BufferMode_KeepLatest)
321 : {
322 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
323 3 : TLOG(TLVL_INFO) << "BufferMode_KeepLatest test case BEGIN";
324 1 : fhicl::ParameterSet ps;
325 2 : ps.put<int>("fragment_id", 1);
326 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
327 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
328 3 : ps.put<bool>("receive_requests", true);
329 3 : ps.put<std::string>("request_mode", "buffer");
330 2 : ps.put("buffer_mode_keep_latest", true);
331 :
332 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
333 1 : buffer->setRunning(true);
334 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
335 1 : artdaq::FragmentBuffer fp(ps);
336 1 : fp.SetRequestBuffer(buffer);
337 :
338 1 : buffer->push(1, 1);
339 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
340 :
341 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
342 :
343 1 : artdaq::FragmentPtrs fps;
344 1 : auto sts = fp.applyRequests(fps);
345 4 : TRACE_REQUIRE_EQUAL(sts, true);
346 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
347 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
348 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
349 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
350 1 : auto type = artdaq::Fragment::ContainerFragmentType;
351 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
352 1 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
353 1 : auto cf = artdaq::ContainerFragment(*fps.front());
354 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
355 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
356 1 : type = artdaq::Fragment::FirstUserFragmentType;
357 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
358 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
359 1 : fps.clear();
360 :
361 1 : buffer->push(2, 5);
362 1 : sts = fp.applyRequests(fps);
363 4 : TRACE_REQUIRE_EQUAL(sts, true);
364 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
365 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
366 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
367 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
368 1 : type = artdaq::Fragment::ContainerFragmentType;
369 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
370 1 : auto cf2 = artdaq::ContainerFragment(*fps.front());
371 4 : TRACE_REQUIRE_EQUAL(cf2.block_count(), 1);
372 4 : TRACE_REQUIRE_EQUAL(cf2.missing_data(), false);
373 1 : type = artdaq::Fragment::FirstUserFragmentType;
374 4 : TRACE_REQUIRE_EQUAL(cf2.fragment_type(), type);
375 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
376 1 : fps.clear();
377 :
378 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
379 1 : buffer->push(4, 7);
380 1 : sts = fp.applyRequests(fps);
381 4 : TRACE_REQUIRE_EQUAL(sts, true);
382 4 : TRACE_REQUIRE_EQUAL(fps.size(), 2);
383 :
384 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
385 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
386 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
387 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
388 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
389 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
390 4 : TRACE_REQUIRE_EQUAL(fps.front()->size(), artdaq::detail::RawFragmentHeader::num_words());
391 1 : fps.pop_front();
392 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
393 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
394 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
395 1 : type = artdaq::Fragment::ContainerFragmentType;
396 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
397 1 : auto cf3 = artdaq::ContainerFragment(*fps.front());
398 4 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 2);
399 4 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), false);
400 1 : type = artdaq::Fragment::FirstUserFragmentType;
401 4 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
402 1 : fps.clear();
403 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
404 :
405 3 : TLOG(TLVL_INFO) << "BufferMode_KeepLatest test case END";
406 1 : }
407 2 : BOOST_AUTO_TEST_CASE(CircularBufferMode)
408 : {
409 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
410 3 : TLOG(TLVL_INFO) << "CircularBufferMode test case BEGIN";
411 1 : fhicl::ParameterSet ps;
412 2 : ps.put<int>("fragment_id", 1);
413 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
414 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
415 2 : ps.put<bool>("receive_requests", true);
416 2 : ps.put<bool>("circular_buffer_mode", true);
417 3 : ps.put<int>("data_buffer_depth_fragments", 3);
418 3 : ps.put<std::string>("request_mode", "buffer");
419 :
420 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
421 1 : buffer->setRunning(true);
422 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
423 1 : artdaq::FragmentBuffer fp(ps);
424 1 : fp.SetRequestBuffer(buffer);
425 :
426 1 : buffer->push(1, 1);
427 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
428 :
429 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
430 :
431 1 : artdaq::FragmentPtrs fps;
432 1 : auto sts = fp.applyRequests(fps);
433 4 : TRACE_REQUIRE_EQUAL(sts, true);
434 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
435 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
436 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
437 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
438 1 : auto type = artdaq::Fragment::ContainerFragmentType;
439 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
440 1 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
441 1 : auto cf = artdaq::ContainerFragment(*fps.front());
442 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
443 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
444 1 : type = artdaq::Fragment::FirstUserFragmentType;
445 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
446 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
447 1 : fps.clear();
448 :
449 1 : buffer->push(2, 5);
450 1 : sts = fp.applyRequests(fps);
451 4 : TRACE_REQUIRE_EQUAL(sts, true);
452 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
453 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
454 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
455 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
456 1 : type = artdaq::Fragment::ContainerFragmentType;
457 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
458 1 : auto cf2 = artdaq::ContainerFragment(*fps.front());
459 4 : TRACE_REQUIRE_EQUAL(cf2.block_count(), 0);
460 4 : TRACE_REQUIRE_EQUAL(cf2.missing_data(), false);
461 1 : type = artdaq::Fragment::EmptyFragmentType;
462 4 : TRACE_REQUIRE_EQUAL(cf2.fragment_type(), type);
463 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
464 1 : fps.clear();
465 :
466 1 : fp.AddFragmentsToBuffer(gen.Generate(3));
467 :
468 1 : buffer->push(4, 7);
469 1 : sts = fp.applyRequests(fps);
470 4 : TRACE_REQUIRE_EQUAL(sts, true);
471 4 : TRACE_REQUIRE_EQUAL(fps.size(), 2);
472 :
473 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
474 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
475 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
476 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
477 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
478 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
479 4 : TRACE_REQUIRE_EQUAL(fps.front()->size(), artdaq::detail::RawFragmentHeader::num_words());
480 1 : fps.pop_front();
481 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
482 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
483 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
484 1 : type = artdaq::Fragment::ContainerFragmentType;
485 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
486 1 : auto cf3 = artdaq::ContainerFragment(*fps.front());
487 4 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 3);
488 4 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), false);
489 1 : type = artdaq::Fragment::FirstUserFragmentType;
490 4 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
491 1 : fps.clear();
492 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
493 :
494 1 : fp.AddFragmentsToBuffer(gen.Generate(5));
495 :
496 1 : buffer->push(5, 8);
497 :
498 1 : sts = fp.applyRequests(fps);
499 4 : TRACE_REQUIRE_EQUAL(sts, true);
500 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
501 :
502 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
503 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 8);
504 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 5);
505 1 : type = artdaq::Fragment::ContainerFragmentType;
506 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
507 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
508 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 3);
509 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), false);
510 1 : type = artdaq::Fragment::FirstUserFragmentType;
511 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
512 4 : TRACE_REQUIRE_EQUAL(cf4.at(0)->timestamp(), 7);
513 4 : TRACE_REQUIRE_EQUAL(cf4.at(1)->timestamp(), 8);
514 4 : TRACE_REQUIRE_EQUAL(cf4.at(2)->timestamp(), 9);
515 1 : fps.clear();
516 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
517 :
518 3 : TLOG(TLVL_INFO) << "CircularBufferMode test case END";
519 1 : }
520 :
521 2 : BOOST_AUTO_TEST_CASE(WindowMode_Function)
522 : {
523 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
524 3 : TLOG(TLVL_INFO) << "WindowMode_Function test case BEGIN";
525 1 : fhicl::ParameterSet ps;
526 2 : ps.put<int>("fragment_id", 1);
527 :
528 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
529 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
530 2 : ps.put<size_t>("data_buffer_depth_fragments", 5);
531 3 : ps.put<bool>("circular_buffer_mode", true);
532 3 : ps.put<std::string>("request_mode", "window");
533 2 : ps.put<bool>("receive_requests", true);
534 2 : ps.put<size_t>("missing_request_window_timeout_us", 500000);
535 2 : ps.put<size_t>("window_close_timeout_us", 500000);
536 :
537 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
538 1 : buffer->setRunning(true);
539 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
540 1 : artdaq::FragmentBuffer fp(ps);
541 1 : fp.SetRequestBuffer(buffer);
542 :
543 1 : buffer->push(1, 1);
544 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
545 :
546 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
547 :
548 1 : artdaq::FragmentPtrs fps;
549 1 : auto sts = fp.applyRequests(fps);
550 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
551 4 : TRACE_REQUIRE_EQUAL(sts, true);
552 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
553 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
554 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
555 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
556 1 : auto type = artdaq::Fragment::ContainerFragmentType;
557 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
558 1 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
559 1 : auto cf = artdaq::ContainerFragment(*fps.front());
560 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
561 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
562 1 : type = artdaq::Fragment::FirstUserFragmentType;
563 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
564 1 : fps.clear();
565 :
566 : // No data for request
567 1 : buffer->push(2, 2);
568 :
569 1 : sts = fp.applyRequests(fps);
570 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
571 4 : TRACE_REQUIRE_EQUAL(sts, true);
572 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
573 :
574 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
575 :
576 1 : sts = fp.applyRequests(fps);
577 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
578 4 : TRACE_REQUIRE_EQUAL(sts, true);
579 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
580 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
581 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 2);
582 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
583 1 : type = artdaq::Fragment::ContainerFragmentType;
584 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
585 1 : auto cf2 = artdaq::ContainerFragment(*fps.front());
586 4 : TRACE_REQUIRE_EQUAL(cf2.block_count(), 1);
587 4 : TRACE_REQUIRE_EQUAL(cf2.missing_data(), false);
588 1 : type = artdaq::Fragment::FirstUserFragmentType;
589 4 : TRACE_REQUIRE_EQUAL(cf2.fragment_type(), type);
590 1 : fps.clear();
591 :
592 : // Request Timeout
593 1 : buffer->push(4, 3);
594 :
595 1 : sts = fp.applyRequests(fps);
596 4 : TRACE_REQUIRE_EQUAL(sts, true);
597 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
598 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
599 :
600 1 : usleep(1500000);
601 1 : sts = fp.applyRequests(fps);
602 4 : TRACE_REQUIRE_EQUAL(sts, true);
603 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
604 :
605 : // Also, missing request timeout
606 1 : auto list = fp.GetSentWindowList(1);
607 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
608 4 : TRACE_REQUIRE_EQUAL(list.begin()->first, 4);
609 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
610 :
611 1 : usleep(1500000);
612 :
613 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
614 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 3);
615 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
616 1 : type = artdaq::Fragment::ContainerFragmentType;
617 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
618 1 : auto cf3 = artdaq::ContainerFragment(*fps.front());
619 4 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 0);
620 4 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), true);
621 1 : type = artdaq::Fragment::EmptyFragmentType;
622 4 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
623 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
624 1 : fps.clear();
625 :
626 : // Data-taking has passed request
627 1 : fp.AddFragmentsToBuffer(gen.Generate(12));
628 :
629 1 : buffer->push(5, 4);
630 :
631 1 : list = fp.GetSentWindowList(1); // Out-of-order list is only updated in getNext calls
632 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
633 1 : sts = fp.applyRequests(fps);
634 1 : list = fp.GetSentWindowList(1);
635 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
636 4 : TRACE_REQUIRE_EQUAL(list.size(), 0);
637 4 : TRACE_REQUIRE_EQUAL(sts, true);
638 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
639 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
640 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 4);
641 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 5);
642 1 : type = artdaq::Fragment::ContainerFragmentType;
643 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
644 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
645 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 0);
646 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
647 1 : type = artdaq::Fragment::EmptyFragmentType;
648 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
649 1 : fps.clear();
650 :
651 : // Out-of-order windows
652 1 : buffer->push(7, 13);
653 :
654 1 : sts = fp.applyRequests(fps);
655 4 : TRACE_REQUIRE_EQUAL(sts, true);
656 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
657 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
658 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 13);
659 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 7);
660 1 : type = artdaq::Fragment::ContainerFragmentType;
661 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
662 1 : auto cf5 = artdaq::ContainerFragment(*fps.front());
663 4 : TRACE_REQUIRE_EQUAL(cf5.block_count(), 1);
664 4 : TRACE_REQUIRE_EQUAL(cf5.missing_data(), false);
665 1 : type = artdaq::Fragment::FirstUserFragmentType;
666 4 : TRACE_REQUIRE_EQUAL(cf5.fragment_type(), type);
667 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
668 1 : fps.clear();
669 :
670 1 : list = fp.GetSentWindowList(1);
671 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
672 4 : TRACE_REQUIRE_EQUAL(list.begin()->first, 7);
673 :
674 1 : buffer->push(6, 12);
675 :
676 1 : sts = fp.applyRequests(fps);
677 4 : TRACE_REQUIRE_EQUAL(sts, true);
678 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
679 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
680 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 12);
681 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 6);
682 1 : type = artdaq::Fragment::ContainerFragmentType;
683 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
684 1 : auto cf6 = artdaq::ContainerFragment(*fps.front());
685 4 : TRACE_REQUIRE_EQUAL(cf6.block_count(), 1);
686 4 : TRACE_REQUIRE_EQUAL(cf6.missing_data(), false);
687 1 : type = artdaq::Fragment::FirstUserFragmentType;
688 4 : TRACE_REQUIRE_EQUAL(cf6.fragment_type(), type);
689 1 : fps.clear();
690 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 8);
691 :
692 1 : list = fp.GetSentWindowList(1);
693 4 : TRACE_REQUIRE_EQUAL(list.size(), 0);
694 :
695 1 : usleep(1500000);
696 :
697 3 : TLOG(TLVL_INFO) << "WindowMode_Function test case END";
698 1 : }
699 :
700 : // 1. Both start and end before any data in buffer "RequestBeforeBuffer"
701 : // 2. Start before buffer, end in buffer "RequestStartsBeforeBuffer"
702 : // 3. Start befoer buffer, end after buffer "RequestOutsideBuffer"
703 : // 4. Start and end in buffer "RequestInBuffer"
704 : // 5. Start in buffer, end after buffer "RequestEndsAfterBuffer"
705 : // 6. Start and end after buffer "RequestAfterBuffer"
706 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestBeforeBuffer)
707 : {
708 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
709 3 : TLOG(TLVL_INFO) << "WindowMode_RequestBeforeBuffer test case BEGIN";
710 1 : fhicl::ParameterSet ps;
711 2 : ps.put<int>("fragment_id", 1);
712 :
713 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
714 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 3);
715 2 : ps.put<bool>("circular_buffer_mode", true);
716 2 : ps.put<bool>("receive_requests", true);
717 3 : ps.put<size_t>("data_buffer_depth_fragments", 5);
718 3 : ps.put<std::string>("request_mode", "window");
719 :
720 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
721 1 : buffer->setRunning(true);
722 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
723 1 : artdaq::FragmentBuffer fp(ps);
724 1 : fp.SetRequestBuffer(buffer);
725 :
726 1 : artdaq::FragmentPtrs fps;
727 : int sts;
728 : artdaq::Fragment::type_t type;
729 :
730 : // 1. Both start and end before any data in buffer
731 : // -- Should return ContainerFragment with MissingData bit set and zero Fragments
732 1 : fp.AddFragmentsToBuffer(gen.Generate(10)); // Buffer start is at ts 6, end at 10
733 :
734 1 : buffer->push(1, 1); // Requesting data from ts 1 to 3
735 :
736 1 : sts = fp.applyRequests(fps);
737 4 : TRACE_REQUIRE_EQUAL(sts, true);
738 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
739 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
740 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
741 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
742 1 : type = artdaq::Fragment::ContainerFragmentType;
743 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
744 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
745 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 0);
746 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
747 1 : type = artdaq::Fragment::EmptyFragmentType;
748 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
749 :
750 3 : TLOG(TLVL_INFO) << "WindowMode_RequestBeforeBuffer test case END";
751 1 : }
752 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestStartsBeforeBuffer)
753 : {
754 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
755 3 : TLOG(TLVL_INFO) << "WindowMode_RequestStartsBeforeBuffer test case BEGIN";
756 :
757 1 : fhicl::ParameterSet ps;
758 2 : ps.put<int>("fragment_id", 1);
759 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
760 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 3);
761 2 : ps.put<size_t>("data_buffer_depth_fragments", 5);
762 2 : ps.put<bool>("circular_buffer_mode", true);
763 3 : ps.put<bool>("receive_requests", true);
764 3 : ps.put<std::string>("request_mode", "window");
765 :
766 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
767 1 : buffer->setRunning(true);
768 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
769 1 : artdaq::FragmentBuffer fp(ps);
770 1 : fp.SetRequestBuffer(buffer);
771 :
772 1 : artdaq::FragmentPtrs fps;
773 : int sts;
774 : artdaq::Fragment::type_t type;
775 :
776 1 : fp.AddFragmentsToBuffer(gen.Generate(10)); // Buffer contains 6 to 10
777 :
778 : // 2. Start before buffer, end in buffer
779 : // -- Should return ContainerFragment with MissingData bit set and one or more Fragments
780 1 : buffer->push(1, 4); // Requesting data from ts 4 to 6
781 :
782 1 : sts = fp.applyRequests(fps);
783 4 : TRACE_REQUIRE_EQUAL(sts, true);
784 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
785 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
786 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 4);
787 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
788 1 : type = artdaq::Fragment::ContainerFragmentType;
789 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
790 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
791 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 1);
792 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
793 1 : type = artdaq::Fragment::FirstUserFragmentType;
794 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
795 :
796 3 : TLOG(TLVL_INFO) << "WindowMode_RequestStartsBeforeBuffer test case END";
797 1 : }
798 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestOutsideBuffer)
799 : {
800 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
801 3 : TLOG(TLVL_INFO) << "WindowMode_RequestOutsideBuffer test case BEGIN";
802 1 : fhicl::ParameterSet ps;
803 2 : ps.put<int>("fragment_id", 1);
804 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
805 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 4);
806 2 : ps.put<size_t>("window_close_timeout_us", 500000);
807 2 : ps.put<bool>("circular_buffer_mode", true);
808 2 : ps.put<size_t>("data_buffer_depth_fragments", 5);
809 3 : ps.put<bool>("receive_requests", true);
810 3 : ps.put<std::string>("request_mode", "window");
811 :
812 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
813 1 : buffer->setRunning(true);
814 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
815 1 : artdaq::FragmentBuffer fp(ps);
816 1 : fp.SetRequestBuffer(buffer);
817 :
818 1 : artdaq::FragmentPtrs fps;
819 : int sts;
820 : artdaq::Fragment::type_t type;
821 :
822 1 : fp.AddFragmentsToBuffer(gen.Generate(10)); // Buffer contains 6 to 10
823 :
824 : // 3. Start before buffer, end after buffer
825 : // -- Should not return until buffer passes end or timeout (check both cases), MissingData bit set
826 :
827 1 : buffer->push(1, 6); // Requesting data from ts 6 to 9, buffer will contain 10
828 :
829 1 : sts = fp.applyRequests(fps);
830 4 : TRACE_REQUIRE_EQUAL(sts, true);
831 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
832 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
833 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 6);
834 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
835 1 : type = artdaq::Fragment::ContainerFragmentType;
836 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
837 1 : auto cf = artdaq::ContainerFragment(*fps.front());
838 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 4);
839 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
840 1 : type = artdaq::Fragment::FirstUserFragmentType;
841 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
842 1 : fps.clear();
843 :
844 1 : buffer->push(2, 9); // Requesting data from ts 9 to 12
845 :
846 1 : sts = fp.applyRequests(fps);
847 4 : TRACE_REQUIRE_EQUAL(sts, true);
848 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
849 :
850 1 : fp.AddFragmentsToBuffer(gen.Generate(3)); // Buffer start is at ts 10, end at 13
851 :
852 1 : sts = fp.applyRequests(fps);
853 4 : TRACE_REQUIRE_EQUAL(sts, true);
854 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
855 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
856 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 9);
857 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
858 1 : type = artdaq::Fragment::ContainerFragmentType;
859 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
860 1 : auto cf2 = artdaq::ContainerFragment(*fps.front());
861 4 : TRACE_REQUIRE_EQUAL(cf2.block_count(), 3);
862 4 : TRACE_REQUIRE_EQUAL(cf2.missing_data(), true);
863 1 : type = artdaq::Fragment::FirstUserFragmentType;
864 4 : TRACE_REQUIRE_EQUAL(cf2.fragment_type(), type);
865 1 : fps.clear();
866 :
867 1 : buffer->push(3, 12); // Requesting data from ts 11 to 14
868 :
869 1 : sts = fp.applyRequests(fps);
870 4 : TRACE_REQUIRE_EQUAL(sts, true);
871 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
872 :
873 1 : usleep(550000);
874 :
875 1 : sts = fp.applyRequests(fps);
876 4 : TRACE_REQUIRE_EQUAL(sts, true);
877 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
878 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
879 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 12);
880 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
881 1 : type = artdaq::Fragment::ContainerFragmentType;
882 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
883 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
884 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 1);
885 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
886 1 : type = artdaq::Fragment::FirstUserFragmentType;
887 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
888 :
889 3 : TLOG(TLVL_INFO) << "WindowMode_RequestOutsideBuffer test case END";
890 1 : }
891 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestInBuffer)
892 : {
893 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
894 3 : TLOG(TLVL_INFO) << "WindowMode_RequestInBuffer test case BEGIN";
895 1 : fhicl::ParameterSet ps;
896 2 : ps.put<int>("fragment_id", 1);
897 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
898 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 3);
899 2 : ps.put<bool>("circular_buffer_mode", true);
900 2 : ps.put<size_t>("data_buffer_depth_fragments", 5);
901 3 : ps.put<bool>("receive_requests", true);
902 3 : ps.put<std::string>("request_mode", "window");
903 :
904 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
905 1 : buffer->setRunning(true);
906 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
907 1 : artdaq::FragmentBuffer fp(ps);
908 1 : fp.SetRequestBuffer(buffer);
909 :
910 1 : artdaq::FragmentPtrs fps;
911 : int sts;
912 : artdaq::Fragment::type_t type;
913 :
914 : // 4. Start and end in buffer
915 : // -- Should return ContainerFragment with one or more Fragments
916 1 : fp.AddFragmentsToBuffer(gen.Generate(6)); // Buffer start is at ts 2, end at 6
917 :
918 1 : buffer->push(1, 3); // Requesting data from ts 3 to 5
919 :
920 1 : sts = fp.applyRequests(fps);
921 4 : TRACE_REQUIRE_EQUAL(sts, true);
922 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
923 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
924 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 3);
925 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
926 1 : type = artdaq::Fragment::ContainerFragmentType;
927 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
928 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
929 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 3);
930 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), false);
931 1 : type = artdaq::Fragment::FirstUserFragmentType;
932 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
933 :
934 3 : TLOG(TLVL_INFO) << "WindowMode_RequestInBuffer test case END";
935 1 : }
936 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestEndsAfterBuffer)
937 : {
938 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
939 3 : TLOG(TLVL_INFO) << "WindowMode_RequestEndsAfterBuffer test case BEGIN";
940 1 : fhicl::ParameterSet ps;
941 2 : ps.put<int>("fragment_id", 1);
942 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
943 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 3);
944 2 : ps.put<size_t>("window_close_timeout_us", 500000);
945 2 : ps.put<bool>("circular_buffer_mode", true);
946 2 : ps.put<bool>("receive_requests", true);
947 3 : ps.put<size_t>("data_buffer_depth_fragments", 5);
948 3 : ps.put<std::string>("request_mode", "window");
949 :
950 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
951 1 : buffer->setRunning(true);
952 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
953 1 : artdaq::FragmentBuffer fp(ps);
954 1 : fp.SetRequestBuffer(buffer);
955 :
956 1 : artdaq::FragmentPtrs fps;
957 : int sts;
958 : artdaq::Fragment::type_t type;
959 :
960 1 : fp.AddFragmentsToBuffer(gen.Generate(6)); // Buffer contains 2 to 6
961 :
962 : // 5. Start in buffer, end after buffer
963 : // -- Should not return until buffer passes end or timeout (check both cases). MissingData bit set if timeout
964 1 : buffer->push(1, 5); // Requesting data from ts 5 to 7
965 :
966 1 : sts = fp.applyRequests(fps);
967 4 : TRACE_REQUIRE_EQUAL(sts, true);
968 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
969 :
970 1 : fp.AddFragmentsToBuffer(gen.Generate(2)); // Buffer contains 4 to 8
971 :
972 1 : sts = fp.applyRequests(fps);
973 4 : TRACE_REQUIRE_EQUAL(sts, true);
974 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
975 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
976 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
977 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
978 1 : type = artdaq::Fragment::ContainerFragmentType;
979 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
980 1 : auto cf = artdaq::ContainerFragment(*fps.front());
981 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 3);
982 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
983 1 : type = artdaq::Fragment::FirstUserFragmentType;
984 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
985 1 : fps.clear();
986 :
987 1 : buffer->push(2, 8); // Requesting data from ts 8 to 10
988 :
989 1 : sts = fp.applyRequests(fps);
990 4 : TRACE_REQUIRE_EQUAL(sts, true);
991 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
992 :
993 1 : usleep(550000);
994 :
995 1 : sts = fp.applyRequests(fps);
996 4 : TRACE_REQUIRE_EQUAL(sts, true);
997 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
998 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
999 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 8);
1000 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1001 1 : type = artdaq::Fragment::ContainerFragmentType;
1002 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1003 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
1004 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 1);
1005 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
1006 1 : type = artdaq::Fragment::FirstUserFragmentType;
1007 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
1008 :
1009 3 : TLOG(TLVL_INFO) << "WindowMode_RequestEndsAfterBuffer test case END";
1010 1 : }
1011 2 : BOOST_AUTO_TEST_CASE(WindowMode_RequestAfterBuffer)
1012 : {
1013 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1014 3 : TLOG(TLVL_INFO) << "WindowMode_RequestAfterBuffer test case BEGIN";
1015 1 : fhicl::ParameterSet ps;
1016 2 : ps.put<int>("fragment_id", 1);
1017 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1018 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 3);
1019 2 : ps.put<size_t>("window_close_timeout_us", 500000);
1020 2 : ps.put<bool>("circular_buffer_mode", true);
1021 2 : ps.put<bool>("receive_requests", true);
1022 3 : ps.put<size_t>("data_buffer_depth_fragments", 5);
1023 3 : ps.put<std::string>("request_mode", "window");
1024 :
1025 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1026 1 : buffer->setRunning(true);
1027 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1028 1 : artdaq::FragmentBuffer fp(ps);
1029 1 : fp.SetRequestBuffer(buffer);
1030 :
1031 1 : artdaq::FragmentPtrs fps;
1032 : int sts;
1033 : artdaq::Fragment::type_t type;
1034 :
1035 : // 6. Start and end after buffer
1036 : // -- Should not return until buffer passes end or timeout (check both cases). MissingData bit set if timeout
1037 1 : fp.AddFragmentsToBuffer(gen.Generate(10)); // Buffer start is 6, end at 10
1038 :
1039 1 : buffer->push(1, 11); // Requesting data from ts 11 to 13
1040 :
1041 1 : sts = fp.applyRequests(fps);
1042 4 : TRACE_REQUIRE_EQUAL(sts, true);
1043 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
1044 1 : fp.AddFragmentsToBuffer(gen.Generate(1)); // Buffer start is 7, end at 11
1045 :
1046 1 : sts = fp.applyRequests(fps);
1047 4 : TRACE_REQUIRE_EQUAL(sts, true);
1048 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
1049 :
1050 1 : fp.AddFragmentsToBuffer(gen.Generate(3)); // Buffer start is 10, end at 14
1051 :
1052 1 : sts = fp.applyRequests(fps);
1053 4 : TRACE_REQUIRE_EQUAL(sts, true);
1054 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
1055 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1056 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 11);
1057 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1058 1 : type = artdaq::Fragment::ContainerFragmentType;
1059 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1060 1 : auto cf = artdaq::ContainerFragment(*fps.front());
1061 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 3);
1062 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1063 1 : type = artdaq::Fragment::FirstUserFragmentType;
1064 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1065 1 : fps.clear();
1066 :
1067 1 : buffer->push(2, 16); // Requesting data from ts 15 to 17
1068 :
1069 1 : sts = fp.applyRequests(fps);
1070 4 : TRACE_REQUIRE_EQUAL(sts, true);
1071 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
1072 :
1073 1 : usleep(550000);
1074 :
1075 1 : sts = fp.applyRequests(fps);
1076 4 : TRACE_REQUIRE_EQUAL(sts, true);
1077 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
1078 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1079 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 16);
1080 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1081 1 : type = artdaq::Fragment::ContainerFragmentType;
1082 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1083 1 : auto cf4 = artdaq::ContainerFragment(*fps.front());
1084 4 : TRACE_REQUIRE_EQUAL(cf4.block_count(), 0);
1085 4 : TRACE_REQUIRE_EQUAL(cf4.missing_data(), true);
1086 1 : type = artdaq::Fragment::EmptyFragmentType;
1087 4 : TRACE_REQUIRE_EQUAL(cf4.fragment_type(), type);
1088 :
1089 3 : TLOG(TLVL_INFO) << "WindowMode_RequestAfterBuffer test case END";
1090 1 : }
1091 :
1092 2 : BOOST_AUTO_TEST_CASE(SequenceIDMode)
1093 : {
1094 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1095 3 : TLOG(TLVL_INFO) << "SequenceIDMode test case BEGIN";
1096 1 : fhicl::ParameterSet ps;
1097 2 : ps.put<int>("fragment_id", 1);
1098 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1099 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1100 3 : ps.put<bool>("receive_requests", true);
1101 3 : ps.put<std::string>("request_mode", "SequenceID");
1102 :
1103 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1104 1 : buffer->setRunning(true);
1105 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1106 1 : artdaq::FragmentBuffer fp(ps);
1107 1 : fp.SetRequestBuffer(buffer);
1108 :
1109 1 : buffer->push(1, 1);
1110 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1111 :
1112 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1113 :
1114 : // Test that Fragment with matching Sequence ID and timestamp is returned
1115 1 : artdaq::FragmentPtrs fps;
1116 1 : auto sts = fp.applyRequests(fps);
1117 1 : auto type = artdaq::Fragment::FirstUserFragmentType;
1118 4 : TRACE_REQUIRE_EQUAL(sts, true);
1119 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
1120 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1121 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1122 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1123 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1124 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1125 1 : fps.clear();
1126 :
1127 : // Test that no Fragment is returned when one does not exist in the buffer
1128 1 : buffer->push(2, 5);
1129 :
1130 1 : sts = fp.applyRequests(fps);
1131 4 : TRACE_REQUIRE_EQUAL(sts, true);
1132 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0u);
1133 :
1134 : // Test that Fragment with matching Sequence ID and non-matching timestamp is returned
1135 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1136 :
1137 1 : sts = fp.applyRequests(fps);
1138 4 : TRACE_REQUIRE_EQUAL(sts, true);
1139 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
1140 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1141 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 2);
1142 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1143 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1144 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1145 1 : fps.clear();
1146 :
1147 : // Test out-of-order requests, with non-matching timestamps
1148 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
1149 :
1150 1 : buffer->push(4, 7);
1151 :
1152 1 : sts = fp.applyRequests(fps);
1153 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1154 4 : TRACE_REQUIRE_EQUAL(sts, true);
1155 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
1156 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1157 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 4);
1158 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
1159 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1160 1 : fps.clear();
1161 :
1162 1 : buffer->push(3, 6);
1163 :
1164 1 : sts = fp.applyRequests(fps);
1165 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
1166 4 : TRACE_REQUIRE_EQUAL(sts, true);
1167 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
1168 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
1169 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 3);
1170 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
1171 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1172 :
1173 3 : TLOG(TLVL_INFO) << "SequenceIDMode test case END";
1174 1 : }
1175 :
1176 2 : BOOST_AUTO_TEST_CASE(IgnoreRequests_MultipleIDs)
1177 : {
1178 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1179 3 : TLOG(TLVL_INFO) << "IgnoreRequests_MultipleIDs test case BEGIN";
1180 1 : fhicl::ParameterSet ps;
1181 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1182 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1183 3 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1184 3 : ps.put<std::string>("request_mode", "ignored");
1185 :
1186 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1187 1 : buffer->setRunning(true);
1188 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1189 1 : artdaq::FragmentBuffer fp(ps);
1190 1 : fp.SetRequestBuffer(buffer);
1191 :
1192 1 : buffer->push(53, 35);
1193 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1194 :
1195 1 : artdaq::FragmentPtrs fps;
1196 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1197 1 : auto sts = fp.applyRequests(fps);
1198 4 : TRACE_REQUIRE_EQUAL(sts, true);
1199 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1200 4 : while (fps.size() > 0)
1201 : {
1202 3 : ids[fps.front()->fragmentID()]++;
1203 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1204 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1205 3 : fps.pop_front();
1206 : }
1207 :
1208 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1209 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1210 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1211 1 : ids.clear();
1212 :
1213 1 : fps.clear();
1214 :
1215 3 : TLOG(TLVL_INFO) << "IgnoreRequests_MultipleIDs test case END";
1216 1 : }
1217 :
1218 2 : BOOST_AUTO_TEST_CASE(SingleMode_MultipleIDs)
1219 : {
1220 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1221 3 : TLOG(TLVL_INFO) << "SingleMode_MultipleIDs test case BEGIN";
1222 1 : fhicl::ParameterSet ps;
1223 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1224 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1225 2 : ps.put<bool>("receive_requests", true);
1226 3 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1227 3 : ps.put<std::string>("request_mode", "single");
1228 :
1229 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1230 1 : buffer->setRunning(true);
1231 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1232 1 : artdaq::FragmentBuffer fp(ps);
1233 1 : fp.SetRequestBuffer(buffer);
1234 :
1235 1 : buffer->push(1, 1);
1236 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1237 :
1238 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1239 :
1240 1 : artdaq::FragmentPtrs fps;
1241 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1242 1 : auto sts = fp.applyRequests(fps);
1243 1 : auto type = artdaq::Fragment::FirstUserFragmentType;
1244 4 : TRACE_REQUIRE_EQUAL(sts, true);
1245 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1246 4 : while (fps.size() > 0)
1247 : {
1248 3 : ids[fps.front()->fragmentID()]++;
1249 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1250 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1251 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1252 3 : fps.pop_front();
1253 : }
1254 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1255 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1256 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1257 1 : ids.clear();
1258 :
1259 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1260 1 : fps.clear();
1261 :
1262 1 : buffer->push(2, 5);
1263 :
1264 1 : sts = fp.applyRequests(fps);
1265 4 : TRACE_REQUIRE_EQUAL(sts, true);
1266 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1267 4 : while (fps.size() > 0)
1268 : {
1269 3 : ids[fps.front()->fragmentID()]++;
1270 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
1271 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1272 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1273 3 : fps.pop_front();
1274 : }
1275 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1276 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1277 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1278 1 : ids.clear();
1279 :
1280 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1281 1 : fps.clear();
1282 :
1283 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
1284 :
1285 1 : buffer->push(4, 7);
1286 :
1287 1 : sts = fp.applyRequests(fps);
1288 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
1289 4 : TRACE_REQUIRE_EQUAL(sts, true);
1290 4 : TRACE_REQUIRE_EQUAL(fps.size(), 6);
1291 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
1292 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
1293 4 : for (auto ii = 0; ii < 3; ++ii)
1294 : {
1295 3 : ids[fps.front()->fragmentID()]++;
1296 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
1297 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
1298 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
1299 3 : fps.pop_front();
1300 : }
1301 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1302 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1303 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1304 1 : ids.clear();
1305 4 : for (auto ii = 0; ii < 3; ++ii)
1306 : {
1307 3 : ids[fps.front()->fragmentID()]++;
1308 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
1309 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
1310 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1311 3 : fps.pop_front();
1312 : }
1313 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1314 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1315 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1316 1 : ids.clear();
1317 1 : fps.clear();
1318 :
1319 : // Single mode should generate 3 Fragments, 2 new ones and one old one
1320 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1321 :
1322 1 : buffer->push(5, 9);
1323 :
1324 1 : sts = fp.applyRequests(fps);
1325 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
1326 4 : TRACE_REQUIRE_EQUAL(sts, true);
1327 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3);
1328 4 : for (auto ii = 0; ii < 3; ++ii)
1329 : {
1330 3 : ids[fps.front()->fragmentID()]++;
1331 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 9);
1332 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 5);
1333 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1334 3 : fps.pop_front();
1335 : }
1336 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1337 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1338 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1339 1 : ids.clear();
1340 1 : fps.clear();
1341 :
1342 3 : TLOG(TLVL_INFO) << "SingleMode_MultipleIDs test case END";
1343 1 : }
1344 :
1345 2 : BOOST_AUTO_TEST_CASE(BufferMode_MultipleIDs)
1346 : {
1347 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1348 3 : TLOG(TLVL_INFO) << "BufferMode_MultipleIDs test case BEGIN";
1349 1 : fhicl::ParameterSet ps;
1350 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1351 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1352 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1353 3 : ps.put<bool>("receive_requests", true);
1354 3 : ps.put<std::string>("request_mode", "buffer");
1355 :
1356 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1357 1 : buffer->setRunning(true);
1358 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1359 1 : artdaq::FragmentBuffer fp(ps);
1360 1 : fp.SetRequestBuffer(buffer);
1361 :
1362 1 : buffer->push(1, 1);
1363 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1364 :
1365 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1366 :
1367 1 : artdaq::FragmentPtrs fps;
1368 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1369 1 : auto sts = fp.applyRequests(fps);
1370 4 : TRACE_REQUIRE_EQUAL(sts, true);
1371 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1372 1 : auto type = artdaq::Fragment::ContainerFragmentType;
1373 4 : while (fps.size() > 0)
1374 : {
1375 3 : ids[fps.front()->fragmentID()]++;
1376 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1377 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1378 3 : type = artdaq::Fragment::ContainerFragmentType;
1379 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1380 3 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
1381 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1382 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1383 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1384 3 : type = artdaq::Fragment::FirstUserFragmentType;
1385 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1386 3 : fps.pop_front();
1387 3 : }
1388 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1389 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1390 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1391 1 : ids.clear();
1392 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1393 1 : fps.clear();
1394 :
1395 1 : buffer->push(2, 5);
1396 :
1397 1 : sts = fp.applyRequests(fps);
1398 4 : TRACE_REQUIRE_EQUAL(sts, true);
1399 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1400 4 : while (fps.size() > 0)
1401 : {
1402 3 : ids[fps.front()->fragmentID()]++;
1403 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
1404 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1405 3 : type = artdaq::Fragment::ContainerFragmentType;
1406 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1407 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1408 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 0);
1409 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1410 3 : type = artdaq::Fragment::EmptyFragmentType;
1411 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1412 3 : fps.pop_front();
1413 3 : }
1414 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1415 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1416 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1417 1 : ids.clear();
1418 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1419 1 : fps.clear();
1420 :
1421 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
1422 :
1423 1 : buffer->push(4, 7);
1424 :
1425 1 : sts = fp.applyRequests(fps);
1426 4 : TRACE_REQUIRE_EQUAL(sts, true);
1427 4 : TRACE_REQUIRE_EQUAL(fps.size(), 6);
1428 :
1429 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
1430 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
1431 4 : for (auto ii = 0; ii < 3; ++ii)
1432 : {
1433 3 : ids[fps.front()->fragmentID()]++;
1434 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
1435 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
1436 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
1437 12 : TRACE_REQUIRE_EQUAL(fps.front()->size(), artdaq::detail::RawFragmentHeader::num_words());
1438 3 : fps.pop_front();
1439 : }
1440 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1441 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1442 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1443 1 : ids.clear();
1444 4 : for (auto ii = 0; ii < 3; ++ii)
1445 : {
1446 3 : ids[fps.front()->fragmentID()]++;
1447 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
1448 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
1449 3 : type = artdaq::Fragment::ContainerFragmentType;
1450 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1451 3 : auto cf3 = artdaq::ContainerFragment(*fps.front());
1452 12 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 2);
1453 12 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), false);
1454 3 : type = artdaq::Fragment::FirstUserFragmentType;
1455 12 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
1456 3 : fps.pop_front();
1457 3 : }
1458 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1459 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1460 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1461 1 : ids.clear();
1462 :
1463 1 : fps.clear();
1464 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
1465 :
1466 3 : TLOG(TLVL_INFO) << "BufferMode_MultipleIDs test case END";
1467 1 : }
1468 :
1469 2 : BOOST_AUTO_TEST_CASE(CircularBufferMode_MultipleIDs)
1470 : {
1471 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1472 3 : TLOG(TLVL_INFO) << "CircularBufferMode_MultipleIDs test case BEGIN";
1473 1 : fhicl::ParameterSet ps;
1474 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1475 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1476 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1477 2 : ps.put<bool>("receive_requests", true);
1478 2 : ps.put<bool>("circular_buffer_mode", true);
1479 3 : ps.put<int>("data_buffer_depth_fragments", 3);
1480 3 : ps.put<std::string>("request_mode", "buffer");
1481 :
1482 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1483 1 : buffer->setRunning(true);
1484 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1485 1 : artdaq::FragmentBuffer fp(ps);
1486 1 : fp.SetRequestBuffer(buffer);
1487 :
1488 1 : buffer->push(1, 1);
1489 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1490 :
1491 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1492 :
1493 1 : artdaq::FragmentPtrs fps;
1494 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1495 1 : auto sts = fp.applyRequests(fps);
1496 1 : auto type = artdaq::Fragment::ContainerFragmentType;
1497 4 : TRACE_REQUIRE_EQUAL(sts, true);
1498 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1499 4 : while (fps.size() > 0)
1500 : {
1501 3 : ids[fps.front()->fragmentID()]++;
1502 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1503 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1504 3 : type = artdaq::Fragment::ContainerFragmentType;
1505 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1506 3 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
1507 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1508 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1509 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1510 3 : type = artdaq::Fragment::FirstUserFragmentType;
1511 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1512 3 : fps.pop_front();
1513 3 : }
1514 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1515 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1516 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1517 1 : ids.clear();
1518 :
1519 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1520 1 : fps.clear();
1521 :
1522 1 : buffer->push(2, 5);
1523 :
1524 1 : sts = fp.applyRequests(fps);
1525 4 : TRACE_REQUIRE_EQUAL(sts, true);
1526 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1527 4 : while (fps.size() > 0)
1528 : {
1529 3 : ids[fps.front()->fragmentID()]++;
1530 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
1531 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1532 3 : type = artdaq::Fragment::ContainerFragmentType;
1533 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1534 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1535 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 0);
1536 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1537 3 : type = artdaq::Fragment::EmptyFragmentType;
1538 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1539 3 : fps.pop_front();
1540 3 : }
1541 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1542 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1543 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1544 1 : ids.clear();
1545 :
1546 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1547 1 : fps.clear();
1548 :
1549 1 : fp.AddFragmentsToBuffer(gen.Generate(3));
1550 :
1551 1 : buffer->push(4, 7);
1552 :
1553 1 : sts = fp.applyRequests(fps);
1554 4 : TRACE_REQUIRE_EQUAL(sts, true);
1555 4 : TRACE_REQUIRE_EQUAL(fps.size(), 6);
1556 :
1557 1 : auto ts = artdaq::Fragment::InvalidTimestamp;
1558 1 : auto emptyType = artdaq::Fragment::EmptyFragmentType;
1559 4 : for (auto ii = 0; ii < 3; ++ii)
1560 : {
1561 3 : ids[fps.front()->fragmentID()]++;
1562 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), ts);
1563 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
1564 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), emptyType);
1565 12 : TRACE_REQUIRE_EQUAL(fps.front()->size(), artdaq::detail::RawFragmentHeader::num_words());
1566 3 : fps.pop_front();
1567 : }
1568 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1569 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1570 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1571 1 : ids.clear();
1572 4 : for (auto ii = 0; ii < 3; ++ii)
1573 : {
1574 3 : ids[fps.front()->fragmentID()]++;
1575 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 7);
1576 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
1577 3 : type = artdaq::Fragment::ContainerFragmentType;
1578 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1579 3 : auto cf3 = artdaq::ContainerFragment(*fps.front());
1580 12 : TRACE_REQUIRE_EQUAL(cf3.block_count(), 3);
1581 12 : TRACE_REQUIRE_EQUAL(cf3.missing_data(), false);
1582 3 : type = artdaq::Fragment::FirstUserFragmentType;
1583 12 : TRACE_REQUIRE_EQUAL(cf3.fragment_type(), type);
1584 3 : fps.pop_front();
1585 3 : }
1586 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1587 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1588 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1589 1 : ids.clear();
1590 :
1591 1 : fps.clear();
1592 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
1593 :
1594 1 : fp.AddFragmentsToBuffer(gen.Generate(5));
1595 :
1596 1 : buffer->push(5, 8);
1597 :
1598 1 : sts = fp.applyRequests(fps);
1599 4 : TRACE_REQUIRE_EQUAL(sts, true);
1600 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1601 4 : while (fps.size() > 0)
1602 : {
1603 3 : ids[fps.front()->fragmentID()]++;
1604 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 8);
1605 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 5);
1606 3 : type = artdaq::Fragment::ContainerFragmentType;
1607 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1608 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1609 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 3);
1610 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1611 3 : type = artdaq::Fragment::FirstUserFragmentType;
1612 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1613 12 : TRACE_REQUIRE_EQUAL(cf.at(0)->timestamp(), 7);
1614 12 : TRACE_REQUIRE_EQUAL(cf.at(1)->timestamp(), 8);
1615 12 : TRACE_REQUIRE_EQUAL(cf.at(2)->timestamp(), 9);
1616 3 : fps.pop_front();
1617 3 : }
1618 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1619 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1620 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1621 1 : ids.clear();
1622 :
1623 1 : fps.clear();
1624 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
1625 :
1626 3 : TLOG(TLVL_INFO) << "CircularBufferMode_MultipleIDs test case END";
1627 1 : }
1628 :
1629 2 : BOOST_AUTO_TEST_CASE(WindowMode_Function_MultipleIDs)
1630 : {
1631 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1632 3 : TLOG(TLVL_INFO) << "WindowMode_Function_MultipleIDs test case BEGIN";
1633 1 : fhicl::ParameterSet ps;
1634 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1635 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1636 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1637 2 : ps.put<size_t>("data_buffer_depth_fragments", 5);
1638 2 : ps.put<bool>("circular_buffer_mode", true);
1639 3 : ps.put<bool>("receive_requests", true);
1640 3 : ps.put<std::string>("request_mode", "window");
1641 2 : ps.put<size_t>("missing_request_window_timeout_us", 500000);
1642 2 : ps.put<size_t>("window_close_timeout_us", 500000);
1643 :
1644 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1645 1 : buffer->setRunning(true);
1646 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1647 1 : artdaq::FragmentBuffer fp(ps);
1648 1 : fp.SetRequestBuffer(buffer);
1649 :
1650 1 : buffer->push(1, 1);
1651 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1652 :
1653 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1654 :
1655 1 : artdaq::FragmentPtrs fps;
1656 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1657 1 : auto sts = fp.applyRequests(fps);
1658 1 : auto type = artdaq::Fragment::ContainerFragmentType;
1659 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1660 4 : TRACE_REQUIRE_EQUAL(sts, true);
1661 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1662 4 : while (fps.size() > 0)
1663 : {
1664 3 : ids[fps.front()->fragmentID()]++;
1665 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1666 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1667 3 : type = artdaq::Fragment::ContainerFragmentType;
1668 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1669 3 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
1670 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1671 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1672 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1673 3 : type = artdaq::Fragment::FirstUserFragmentType;
1674 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1675 3 : fps.pop_front();
1676 3 : }
1677 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1678 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1679 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1680 1 : ids.clear();
1681 1 : fps.clear();
1682 :
1683 : // No data for request
1684 1 : buffer->push(2, 2);
1685 :
1686 1 : sts = fp.applyRequests(fps);
1687 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1688 4 : TRACE_REQUIRE_EQUAL(sts, true);
1689 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
1690 :
1691 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1692 :
1693 1 : sts = fp.applyRequests(fps);
1694 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1695 4 : TRACE_REQUIRE_EQUAL(sts, true);
1696 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1697 4 : while (fps.size() > 0)
1698 : {
1699 3 : ids[fps.front()->fragmentID()]++;
1700 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 2);
1701 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1702 3 : type = artdaq::Fragment::ContainerFragmentType;
1703 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1704 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1705 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1706 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1707 3 : type = artdaq::Fragment::FirstUserFragmentType;
1708 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1709 3 : fps.pop_front();
1710 3 : }
1711 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1712 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1713 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1714 1 : ids.clear();
1715 1 : fps.clear();
1716 :
1717 : // Request Timeout
1718 1 : buffer->push(4, 3);
1719 :
1720 1 : sts = fp.applyRequests(fps);
1721 4 : TRACE_REQUIRE_EQUAL(sts, true);
1722 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
1723 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1724 :
1725 1 : usleep(1500000);
1726 1 : sts = fp.applyRequests(fps);
1727 4 : TRACE_REQUIRE_EQUAL(sts, true);
1728 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3);
1729 :
1730 : // Also, missing request timeout
1731 1 : auto list = fp.GetSentWindowList(1);
1732 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
1733 4 : TRACE_REQUIRE_EQUAL(list.begin()->first, 4);
1734 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1735 :
1736 1 : usleep(1500000);
1737 :
1738 4 : while (fps.size() > 0)
1739 : {
1740 3 : ids[fps.front()->fragmentID()]++;
1741 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 3);
1742 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
1743 3 : type = artdaq::Fragment::ContainerFragmentType;
1744 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1745 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1746 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 0);
1747 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), true);
1748 3 : type = artdaq::Fragment::EmptyFragmentType;
1749 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1750 3 : fps.pop_front();
1751 3 : }
1752 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1753 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1754 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1755 1 : ids.clear();
1756 :
1757 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1758 1 : fps.clear();
1759 :
1760 : // Data-taking has passed request
1761 1 : fp.AddFragmentsToBuffer(gen.Generate(12));
1762 :
1763 1 : buffer->push(5, 4);
1764 :
1765 1 : list = fp.GetSentWindowList(1); // Out-of-order list is only updated in getNext calls
1766 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
1767 1 : sts = fp.applyRequests(fps);
1768 1 : list = fp.GetSentWindowList(1);
1769 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
1770 4 : TRACE_REQUIRE_EQUAL(list.size(), 0);
1771 4 : TRACE_REQUIRE_EQUAL(sts, true);
1772 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3);
1773 4 : while (fps.size() > 0)
1774 : {
1775 3 : ids[fps.front()->fragmentID()]++;
1776 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 4);
1777 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 5);
1778 3 : type = artdaq::Fragment::ContainerFragmentType;
1779 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1780 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1781 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 0);
1782 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), true);
1783 3 : type = artdaq::Fragment::EmptyFragmentType;
1784 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1785 3 : fps.pop_front();
1786 3 : }
1787 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1788 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1789 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1790 1 : ids.clear();
1791 1 : fps.clear();
1792 :
1793 : // Out-of-order windows
1794 1 : buffer->push(7, 13);
1795 :
1796 1 : sts = fp.applyRequests(fps);
1797 4 : TRACE_REQUIRE_EQUAL(sts, true);
1798 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3);
1799 4 : while (fps.size() > 0)
1800 : {
1801 3 : ids[fps.front()->fragmentID()]++;
1802 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 13);
1803 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 7);
1804 3 : type = artdaq::Fragment::ContainerFragmentType;
1805 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1806 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1807 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1808 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1809 3 : type = artdaq::Fragment::FirstUserFragmentType;
1810 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1811 3 : fps.pop_front();
1812 3 : }
1813 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1814 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1815 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1816 1 : ids.clear();
1817 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 6);
1818 1 : fps.clear();
1819 :
1820 1 : list = fp.GetSentWindowList(1);
1821 4 : TRACE_REQUIRE_EQUAL(list.size(), 1);
1822 4 : TRACE_REQUIRE_EQUAL(list.begin()->first, 7);
1823 :
1824 1 : buffer->push(6, 12);
1825 :
1826 1 : sts = fp.applyRequests(fps);
1827 4 : TRACE_REQUIRE_EQUAL(sts, true);
1828 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3);
1829 4 : while (fps.size() > 0)
1830 : {
1831 3 : ids[fps.front()->fragmentID()]++;
1832 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 12);
1833 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 6);
1834 3 : type = artdaq::Fragment::ContainerFragmentType;
1835 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1836 3 : auto cf = artdaq::ContainerFragment(*fps.front());
1837 12 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1838 12 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1839 3 : type = artdaq::Fragment::FirstUserFragmentType;
1840 12 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1841 3 : fps.pop_front();
1842 3 : }
1843 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1844 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1845 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1846 1 : ids.clear();
1847 1 : fps.clear();
1848 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 8);
1849 :
1850 1 : list = fp.GetSentWindowList(1);
1851 4 : TRACE_REQUIRE_EQUAL(list.size(), 0);
1852 :
1853 2 : fp.AddFragmentsToBuffer(gen.Generate(1, {1, 2}));
1854 :
1855 1 : buffer->push(8, 15);
1856 :
1857 1 : sts = fp.applyRequests(fps);
1858 4 : TRACE_REQUIRE_EQUAL(sts, true);
1859 4 : TRACE_REQUIRE_EQUAL(fps.size(), 2);
1860 3 : while (fps.size() > 0)
1861 : {
1862 2 : ids[fps.front()->fragmentID()]++;
1863 8 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 15);
1864 8 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 8);
1865 2 : type = artdaq::Fragment::ContainerFragmentType;
1866 8 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1867 2 : auto cf = artdaq::ContainerFragment(*fps.front());
1868 8 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1869 8 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1870 2 : type = artdaq::Fragment::FirstUserFragmentType;
1871 8 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1872 2 : fps.pop_front();
1873 2 : }
1874 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1875 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1876 4 : TRACE_REQUIRE_EQUAL(ids[3], 0);
1877 1 : ids.clear();
1878 1 : fps.clear();
1879 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 8);
1880 :
1881 1 : gen.setTimestamp(14); // Reset timestamp
1882 2 : fp.AddFragmentsToBuffer(gen.Generate(1, {3}));
1883 :
1884 1 : sts = fp.applyRequests(fps);
1885 4 : TRACE_REQUIRE_EQUAL(sts, true);
1886 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1);
1887 2 : while (fps.size() > 0)
1888 : {
1889 1 : ids[fps.front()->fragmentID()]++;
1890 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 15);
1891 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 8);
1892 1 : type = artdaq::Fragment::ContainerFragmentType;
1893 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1894 1 : auto cf = artdaq::ContainerFragment(*fps.front());
1895 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
1896 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
1897 1 : type = artdaq::Fragment::FirstUserFragmentType;
1898 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
1899 1 : fps.pop_front();
1900 1 : }
1901 4 : TRACE_REQUIRE_EQUAL(ids[1], 0);
1902 4 : TRACE_REQUIRE_EQUAL(ids[2], 0);
1903 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1904 1 : ids.clear();
1905 1 : fps.clear();
1906 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 9);
1907 :
1908 3 : TLOG(TLVL_INFO) << "WindowMode_Function_MultipleIDs test case END";
1909 1 : }
1910 :
1911 2 : BOOST_AUTO_TEST_CASE(SequenceIDMode_MultipleIDs)
1912 : {
1913 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
1914 3 : TLOG(TLVL_INFO) << "SequenceIDMode_MultipleIDs test case BEGIN";
1915 1 : fhicl::ParameterSet ps;
1916 4 : ps.put<std::vector<int>>("fragment_ids", {1, 2, 3});
1917 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
1918 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
1919 2 : ps.put<bool>("separate_data_thread", true);
1920 2 : ps.put<bool>("separate_monitoring_thread", false);
1921 2 : ps.put<bool>("receive_requests", true);
1922 3 : ps.put<int64_t>("hardware_poll_interval_us", 0);
1923 3 : ps.put<std::string>("request_mode", "SequenceID");
1924 :
1925 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
1926 1 : buffer->setRunning(true);
1927 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
1928 1 : artdaq::FragmentBuffer fp(ps);
1929 1 : fp.SetRequestBuffer(buffer);
1930 :
1931 1 : buffer->push(1, 1);
1932 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1933 :
1934 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
1935 :
1936 : // Test that Fragment with matching Sequence ID and timestamp is returned
1937 1 : artdaq::FragmentPtrs fps;
1938 1 : std::map<artdaq::Fragment::fragment_id_t, size_t> ids;
1939 1 : auto sts = fp.applyRequests(fps);
1940 1 : auto type = artdaq::Fragment::FirstUserFragmentType;
1941 4 : TRACE_REQUIRE_EQUAL(sts, true);
1942 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1943 :
1944 4 : for (auto ii = 0; ii < 3; ++ii)
1945 : {
1946 3 : ids[fps.front()->fragmentID()]++;
1947 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
1948 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
1949 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1950 3 : fps.pop_front();
1951 : }
1952 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1953 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1954 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1955 1 : ids.clear();
1956 :
1957 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
1958 1 : fps.clear();
1959 :
1960 : // Test that no Fragment is returned when one does not exist in the buffer
1961 1 : buffer->push(2, 5);
1962 :
1963 1 : sts = fp.applyRequests(fps);
1964 4 : TRACE_REQUIRE_EQUAL(sts, true);
1965 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0u);
1966 :
1967 : // Test that Fragment with matching Sequence ID and non-matching timestamp is returned
1968 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
1969 :
1970 1 : sts = fp.applyRequests(fps);
1971 4 : TRACE_REQUIRE_EQUAL(sts, true);
1972 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1973 4 : for (auto ii = 0; ii < 3; ++ii)
1974 : {
1975 3 : ids[fps.front()->fragmentID()]++;
1976 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 2);
1977 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
1978 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
1979 3 : fps.pop_front();
1980 : }
1981 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
1982 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
1983 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
1984 1 : ids.clear();
1985 :
1986 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1987 1 : fps.clear();
1988 :
1989 : // Test out-of-order requests, with non-matching timestamps
1990 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
1991 :
1992 1 : buffer->push(4, 7);
1993 :
1994 1 : sts = fp.applyRequests(fps);
1995 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
1996 4 : TRACE_REQUIRE_EQUAL(sts, true);
1997 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
1998 4 : for (auto ii = 0; ii < 3; ++ii)
1999 : {
2000 3 : ids[fps.front()->fragmentID()]++;
2001 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 4);
2002 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 4);
2003 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2004 3 : fps.pop_front();
2005 : }
2006 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
2007 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
2008 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
2009 1 : ids.clear();
2010 1 : fps.clear();
2011 :
2012 1 : buffer->push(3, 6);
2013 :
2014 1 : sts = fp.applyRequests(fps);
2015 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 5);
2016 4 : TRACE_REQUIRE_EQUAL(sts, true);
2017 4 : TRACE_REQUIRE_EQUAL(fps.size(), 3u);
2018 4 : for (auto ii = 0; ii < 3; ++ii)
2019 : {
2020 3 : ids[fps.front()->fragmentID()]++;
2021 12 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 3);
2022 12 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 3);
2023 12 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2024 3 : fps.pop_front();
2025 : }
2026 4 : TRACE_REQUIRE_EQUAL(ids[1], 1);
2027 4 : TRACE_REQUIRE_EQUAL(ids[2], 1);
2028 4 : TRACE_REQUIRE_EQUAL(ids[3], 1);
2029 1 : ids.clear();
2030 :
2031 3 : TLOG(TLVL_INFO) << "SequenceIDMode_MultipleIDs test case END";
2032 1 : }
2033 :
2034 2 : BOOST_AUTO_TEST_CASE(IgnoreRequests_StateMachine)
2035 : {
2036 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2037 3 : TLOG(TLVL_INFO) << "IgnoreRequests_StateMachine test case BEGIN";
2038 1 : fhicl::ParameterSet ps;
2039 2 : ps.put<int>("fragment_id", 1);
2040 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2041 3 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2042 3 : ps.put<std::string>("request_mode", "ignored");
2043 :
2044 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2045 1 : buffer->setRunning(true);
2046 :
2047 1 : artdaq::FragmentBuffer fp(ps);
2048 1 : fp.SetRequestBuffer(buffer);
2049 :
2050 1 : buffer->push(53, 35);
2051 :
2052 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2053 :
2054 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
2055 :
2056 1 : artdaq::FragmentPtrs fps;
2057 1 : auto sts = fp.applyRequests(fps);
2058 :
2059 4 : TRACE_REQUIRE_EQUAL(sts, true);
2060 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
2061 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2062 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2063 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2064 1 : fps.clear();
2065 :
2066 1 : fp.Reset(false);
2067 1 : sts = fp.applyRequests(fps);
2068 :
2069 4 : TRACE_REQUIRE_EQUAL(sts, true);
2070 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0u);
2071 :
2072 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
2073 :
2074 1 : sts = fp.applyRequests(fps);
2075 :
2076 4 : TRACE_REQUIRE_EQUAL(sts, true);
2077 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
2078 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2079 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 2);
2080 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
2081 1 : fps.clear();
2082 :
2083 1 : fp.Stop();
2084 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
2085 :
2086 1 : sts = fp.applyRequests(fps);
2087 :
2088 4 : TRACE_REQUIRE_EQUAL(sts, false);
2089 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0u);
2090 :
2091 3 : TLOG(TLVL_INFO) << "IgnoreRequests_StateMachine test case END";
2092 1 : }
2093 :
2094 2 : BOOST_AUTO_TEST_CASE(SingleMode_StateMachine)
2095 : {
2096 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2097 3 : TLOG(TLVL_INFO) << "SingleMode_StateMachine test case BEGIN";
2098 1 : fhicl::ParameterSet ps;
2099 2 : ps.put<int>("fragment_id", 1);
2100 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2101 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2102 3 : ps.put<bool>("receive_requests", true);
2103 3 : ps.put<std::string>("request_mode", "single");
2104 :
2105 1 : auto type = artdaq::Fragment::FirstUserFragmentType;
2106 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2107 1 : buffer->setRunning(true);
2108 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2109 1 : artdaq::FragmentBuffer fp(ps);
2110 1 : fp.SetRequestBuffer(buffer);
2111 :
2112 1 : buffer->push(1, 1);
2113 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
2114 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
2115 :
2116 1 : artdaq::FragmentPtrs fps;
2117 1 : auto sts = fp.applyRequests(fps);
2118 4 : TRACE_REQUIRE_EQUAL(sts, true);
2119 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
2120 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2121 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2122 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2123 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2124 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
2125 1 : fps.clear();
2126 :
2127 1 : fp.Reset(false);
2128 1 : buffer->reset();
2129 1 : sts = fp.applyRequests(fps);
2130 4 : TRACE_REQUIRE_EQUAL(sts, true);
2131 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0u);
2132 :
2133 1 : buffer->push(1, 1);
2134 1 : fp.AddFragmentsToBuffer(gen.Generate(1));
2135 1 : sts = fp.applyRequests(fps);
2136 4 : TRACE_REQUIRE_EQUAL(sts, true);
2137 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
2138 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2139 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2140 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2141 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2142 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 2);
2143 1 : fps.clear();
2144 :
2145 1 : fp.Stop();
2146 1 : buffer->push(2, 5);
2147 1 : sts = fp.applyRequests(fps);
2148 4 : TRACE_REQUIRE_EQUAL(sts, true);
2149 4 : TRACE_REQUIRE_EQUAL(fps.size(), 1u);
2150 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2151 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 5);
2152 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 2);
2153 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2154 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 3);
2155 1 : fps.clear();
2156 :
2157 1 : buffer->setRunning(false);
2158 1 : fp.AddFragmentsToBuffer(gen.Generate(2));
2159 1 : buffer->push(4, 7);
2160 :
2161 1 : sts = fp.applyRequests(fps);
2162 4 : TRACE_REQUIRE_EQUAL(sts, false);
2163 4 : TRACE_REQUIRE_EQUAL(fps.size(), 0);
2164 :
2165 3 : TLOG(TLVL_INFO) << "SingleMode_StateMachine test case END";
2166 1 : }
2167 :
2168 2 : BOOST_AUTO_TEST_CASE(WindowMode_RateTests)
2169 : {
2170 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2171 3 : TLOG(TLVL_INFO) << "WindowMode_RateTests test case BEGIN";
2172 1 : fhicl::ParameterSet ps;
2173 2 : ps.put<int>("fragment_id", 1);
2174 :
2175 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2176 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2177 2 : ps.put<size_t>("data_buffer_depth_fragments", 2 * RATE_TEST_COUNT);
2178 2 : ps.put<bool>("circular_buffer_mode", false);
2179 3 : ps.put<bool>("receive_requests", true);
2180 3 : ps.put<std::string>("request_mode", "window");
2181 2 : ps.put<size_t>("missing_request_window_timeout_us", 500000);
2182 2 : ps.put<size_t>("window_close_timeout_us", 500000);
2183 :
2184 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2185 1 : buffer->setRunning(true);
2186 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2187 1 : artdaq::FragmentBuffer fp(ps);
2188 1 : fp.SetRequestBuffer(buffer);
2189 :
2190 1 : auto beginop = std::chrono::steady_clock::now();
2191 :
2192 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests BEGIN";
2193 1 : size_t req_seq = 1;
2194 100001 : for (; req_seq < RATE_TEST_COUNT + 1; ++req_seq)
2195 : {
2196 100000 : buffer->push(req_seq, req_seq);
2197 : }
2198 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests END.Time elapsed = " << artdaq::TimeUtils::GetElapsedTime(beginop)
2199 2 : << "(" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs/s) ";
2200 :
2201 1 : beginop = std::chrono::steady_clock::now();
2202 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments BEGIN";
2203 1 : fp.AddFragmentsToBuffer(gen.Generate(RATE_TEST_COUNT));
2204 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2205 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " frags/s)";
2206 :
2207 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
2208 :
2209 1 : beginop = std::chrono::steady_clock::now();
2210 3 : TLOG(TLVL_INFO) << "Applying requests BEGIN";
2211 1 : artdaq::FragmentPtrs fps;
2212 1 : auto sts = fp.applyRequests(fps);
2213 3 : TLOG(TLVL_INFO) << "Applying requests END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2214 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs/s)";
2215 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), RATE_TEST_COUNT + 1);
2216 4 : TRACE_REQUIRE_EQUAL(sts, true);
2217 4 : TRACE_REQUIRE_EQUAL(fps.size(), RATE_TEST_COUNT);
2218 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2219 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2220 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2221 1 : auto type = artdaq::Fragment::ContainerFragmentType;
2222 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2223 1 : BOOST_REQUIRE_GE(fps.front()->sizeBytes(), 2 * sizeof(artdaq::detail::RawFragmentHeader) + sizeof(artdaq::ContainerFragment::Metadata));
2224 1 : auto cf = artdaq::ContainerFragment(*fps.front());
2225 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 1);
2226 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), false);
2227 1 : type = artdaq::Fragment::FirstUserFragmentType;
2228 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
2229 1 : fps.clear();
2230 :
2231 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests BEGIN";
2232 100001 : for (; req_seq < (2 * RATE_TEST_COUNT) + 1; ++req_seq)
2233 : {
2234 100000 : buffer->push(req_seq, req_seq);
2235 : }
2236 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests END. Time elapsed = " << artdaq::TimeUtils::GetElapsedTime(beginop)
2237 2 : << "(" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs/s) ";
2238 :
2239 1 : beginop = std::chrono::steady_clock::now();
2240 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments Individually BEGIN";
2241 100001 : for (int ii = 0; ii < RATE_TEST_COUNT; ++ii)
2242 100000 : fp.AddFragmentsToBuffer(gen.Generate(1));
2243 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments Individually END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2244 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " frags/s)";
2245 :
2246 3 : TLOG(TLVL_INFO) << "WindowMode_RateTests test case END";
2247 1 : }
2248 :
2249 2 : BOOST_AUTO_TEST_CASE(CircularBufferMode_RateTests)
2250 : {
2251 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2252 3 : TLOG(TLVL_INFO) << "CircularBufferMode_RateTests test case BEGIN";
2253 1 : fhicl::ParameterSet ps;
2254 2 : ps.put<int>("fragment_id", 1);
2255 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2256 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2257 2 : ps.put<size_t>("data_buffer_depth_fragments", RATE_TEST_COUNT / 2);
2258 2 : ps.put<bool>("circular_buffer_mode", true);
2259 3 : ps.put<bool>("receive_requests", true);
2260 3 : ps.put<std::string>("request_mode", "window");
2261 2 : ps.put<size_t>("missing_request_window_timeout_us", 500000);
2262 2 : ps.put<size_t>("window_close_timeout_us", 500000);
2263 :
2264 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2265 1 : buffer->setRunning(true);
2266 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2267 1 : artdaq::FragmentBuffer fp(ps);
2268 1 : fp.SetRequestBuffer(buffer);
2269 :
2270 1 : auto beginop = std::chrono::steady_clock::now();
2271 :
2272 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests BEGIN";
2273 100001 : for (size_t ii = 1; ii < RATE_TEST_COUNT + 1; ++ii)
2274 : {
2275 100000 : buffer->push(ii, ii);
2276 : }
2277 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests END. Time elapsed = " << artdaq::TimeUtils::GetElapsedTime(beginop)
2278 2 : << "(" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs / s) ";
2279 :
2280 1 : beginop = std::chrono::steady_clock::now();
2281 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments BEGIN";
2282 1 : fp.AddFragmentsToBuffer(gen.Generate(RATE_TEST_COUNT));
2283 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2284 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " frags/s)";
2285 :
2286 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), 1);
2287 :
2288 1 : beginop = std::chrono::steady_clock::now();
2289 3 : TLOG(TLVL_INFO) << "Applying requests BEGIN";
2290 1 : artdaq::FragmentPtrs fps;
2291 1 : auto sts = fp.applyRequests(fps);
2292 3 : TLOG(TLVL_INFO) << "Applying requests END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2293 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs/s)";
2294 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), RATE_TEST_COUNT + 1);
2295 4 : TRACE_REQUIRE_EQUAL(sts, true);
2296 4 : TRACE_REQUIRE_EQUAL(fps.size(), RATE_TEST_COUNT);
2297 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2298 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2299 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2300 1 : auto type = artdaq::Fragment::ContainerFragmentType;
2301 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2302 1 : auto cf = artdaq::ContainerFragment(*fps.front());
2303 4 : TRACE_REQUIRE_EQUAL(cf.block_count(), 0);
2304 4 : TRACE_REQUIRE_EQUAL(cf.missing_data(), true);
2305 1 : type = artdaq::Fragment::EmptyFragmentType;
2306 4 : TRACE_REQUIRE_EQUAL(cf.fragment_type(), type);
2307 1 : fps.clear();
2308 :
2309 3 : TLOG(TLVL_INFO) << "CircularBufferMode_RateTests test case END";
2310 1 : }
2311 :
2312 2 : BOOST_AUTO_TEST_CASE(WindowMode_RateTests_threaded)
2313 : {
2314 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2315 3 : TLOG(TLVL_INFO) << "WindowMode_RateTests_threaded test case BEGIN";
2316 1 : fhicl::ParameterSet ps;
2317 2 : ps.put<int>("fragment_id", 1);
2318 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2319 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2320 2 : ps.put<size_t>("data_buffer_depth_fragments", RATE_TEST_COUNT);
2321 3 : ps.put<bool>("circular_buffer_mode", false);
2322 3 : ps.put<std::string>("request_mode", "window");
2323 2 : ps.put<bool>("receive_requests", true);
2324 2 : ps.put<size_t>("missing_request_window_timeout_us", 500000);
2325 2 : ps.put<size_t>("window_close_timeout_us", 500000);
2326 :
2327 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2328 1 : buffer->setRunning(true);
2329 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2330 1 : artdaq::FragmentBuffer fp(ps);
2331 1 : fp.SetRequestBuffer(buffer);
2332 :
2333 1 : std::atomic<int> thread_sync = 0;
2334 1 : auto gen_requests = [&]() {
2335 1 : ++thread_sync;
2336 28152 : while (thread_sync < 3) {}
2337 1 : auto beginop = std::chrono::steady_clock::now();
2338 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests BEGIN";
2339 100001 : for (size_t ii = 1; ii < RATE_TEST_COUNT + 1; ++ii)
2340 : {
2341 100000 : buffer->push(ii, ii);
2342 : }
2343 3 : TLOG(TLVL_INFO) << "Generating " << RATE_TEST_COUNT << " requests END. Time elapsed = " << artdaq::TimeUtils::GetElapsedTime(beginop)
2344 2 : << "(" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " reqs / s) ";
2345 2 : };
2346 :
2347 1 : auto gen_frags = [&]() {
2348 1 : ++thread_sync;
2349 2331 : while (thread_sync < 3) {}
2350 1 : auto beginop = std::chrono::steady_clock::now();
2351 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments BEGIN";
2352 100001 : for (int ii = 0; ii < RATE_TEST_COUNT; ++ii)
2353 100000 : fp.AddFragmentsToBuffer(gen.Generate(1));
2354 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2355 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " frags/s)";
2356 2 : };
2357 :
2358 1 : auto apply_requests = [&]() {
2359 1 : ++thread_sync;
2360 1 : while (thread_sync < 3) {}
2361 1 : artdaq::FragmentPtrs fps;
2362 1 : auto begin_test = std::chrono::steady_clock::now();
2363 5 : while (fps.size() < RATE_TEST_COUNT && artdaq::TimeUtils::GetElapsedTime(begin_test) < RATE_TEST_COUNT / 1000) // 1 ms per request
2364 : {
2365 4 : auto beginop = std::chrono::steady_clock::now();
2366 12 : TLOG(TLVL_INFO) << "Applying requests BEGIN";
2367 4 : auto sts = fp.applyRequests(fps);
2368 16 : TRACE_REQUIRE_EQUAL(sts, true);
2369 12 : TLOG(TLVL_INFO) << "Applying requests END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop);
2370 : }
2371 1 : if (fps.size() < RATE_TEST_COUNT)
2372 : {
2373 0 : TLOG(TLVL_WARNING) << "Some requests did not return data. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(begin_test) << ", " << fps.size() << " / " << RATE_TEST_COUNT << " Fragments received";
2374 : }
2375 : else
2376 : {
2377 3 : TLOG(TLVL_INFO) << "All request replies received. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(begin_test)
2378 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(begin_test) << " reqs/s)";
2379 4 : TRACE_REQUIRE_EQUAL(fp.GetNextSequenceID(), RATE_TEST_COUNT + 1);
2380 :
2381 4 : TRACE_REQUIRE_EQUAL(fps.size(), RATE_TEST_COUNT);
2382 : }
2383 4 : TRACE_REQUIRE_EQUAL(fps.front()->fragmentID(), 1);
2384 4 : TRACE_REQUIRE_EQUAL(fps.front()->timestamp(), 1);
2385 4 : TRACE_REQUIRE_EQUAL(fps.front()->sequenceID(), 1);
2386 1 : auto type = artdaq::Fragment::ContainerFragmentType;
2387 4 : TRACE_REQUIRE_EQUAL(fps.front()->type(), type);
2388 1 : fps.clear();
2389 2 : };
2390 :
2391 1 : std::thread t1(gen_requests);
2392 1 : std::thread t2(gen_frags);
2393 1 : std::thread t3(apply_requests);
2394 :
2395 1 : t1.join();
2396 1 : t2.join();
2397 1 : t3.join();
2398 :
2399 3 : TLOG(TLVL_INFO) << "WindowMode_RateTests_threaded test case END";
2400 1 : }
2401 :
2402 2 : BOOST_AUTO_TEST_CASE(WaitForDataBufferReady_RaceCondition)
2403 : {
2404 1 : artdaq::configureMessageFacility("FragmentBuffer_t", true, MESSAGEFACILITY_DEBUG);
2405 3 : TLOG(TLVL_INFO) << "WaitForDataBufferReady_RaceCondition test case BEGIN";
2406 1 : fhicl::ParameterSet ps;
2407 2 : ps.put<int>("fragment_id", 1);
2408 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_offset", 0);
2409 2 : ps.put<artdaq::Fragment::timestamp_t>("request_window_width", 0);
2410 2 : ps.put<size_t>("data_buffer_depth_fragments", 1);
2411 2 : ps.put<bool>("circular_buffer_mode", false);
2412 3 : ps.put<bool>("receive_requests", true);
2413 3 : ps.put<std::string>("request_mode", "window");
2414 :
2415 1 : auto buffer = std::make_shared<artdaq::RequestBuffer>();
2416 1 : buffer->setRunning(true);
2417 1 : artdaqtest::FragmentBufferTestGenerator gen(ps);
2418 1 : artdaq::FragmentBuffer fp(ps);
2419 1 : fp.SetRequestBuffer(buffer);
2420 :
2421 1 : std::atomic<int> thread_sync = 0;
2422 :
2423 1 : auto gen_frags = [&]() {
2424 1 : ++thread_sync;
2425 6542 : while (thread_sync < 2) {}
2426 1 : auto beginop = std::chrono::steady_clock::now();
2427 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments BEGIN";
2428 100001 : for (int ii = 0; ii < RATE_TEST_COUNT; ++ii)
2429 100000 : fp.AddFragmentsToBuffer(gen.Generate(1));
2430 3 : TLOG(TLVL_INFO) << "Generating/adding " << RATE_TEST_COUNT << " Fragments END. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop)
2431 2 : << " (" << RATE_TEST_COUNT / artdaq::TimeUtils::GetElapsedTime(beginop) << " frags/s)";
2432 1 : thread_sync = 0;
2433 2 : };
2434 :
2435 1 : auto reset_buffer = [&]() {
2436 1 : ++thread_sync;
2437 1 : while (thread_sync < 2) {}
2438 1 : auto beginop = std::chrono::steady_clock::now();
2439 3 : TLOG(TLVL_INFO) << "Resetting loop BEGIN";
2440 1 : size_t counter = 0;
2441 99490238 : while (thread_sync > 0)
2442 : {
2443 99490237 : fp.Reset(false);
2444 99490237 : counter++;
2445 : }
2446 3 : TLOG(TLVL_INFO) << "Reset " << counter << " times during loop. Time elapsed=" << artdaq::TimeUtils::GetElapsedTime(beginop);
2447 2 : };
2448 :
2449 1 : std::thread t1(gen_frags);
2450 1 : std::thread t2(reset_buffer);
2451 :
2452 1 : t1.join();
2453 1 : t2.join();
2454 :
2455 3 : TLOG(TLVL_INFO) << "WaitForDataBufferReady_RaceCondition test case END";
2456 1 : }
2457 :
2458 : BOOST_AUTO_TEST_SUITE_END()
|