Line data Source code
1 :
2 : #include "artdaq/DAQrate/SharedMemoryEventManager.hh"
3 : #include <sys/wait.h>
4 :
5 : #include <memory>
6 : #include <numeric>
7 :
8 : #include "artdaq-core/Core/StatisticsCollection.hh"
9 : #include "artdaq-core/Data/MetadataFragment.hh"
10 : #include "artdaq-core/Utilities/TraceLock.hh"
11 :
12 : #define TRACE_NAME (app_name + "_SharedMemoryEventManager").c_str()
13 :
14 : // clang-format off
15 : #define TLVL_ADDFRAGMENT 32
16 : #define TLVL_ADDINITFRAGMENT 33
17 : #define TLVL_BROADCASTFRAGMENT 34
18 : #define TLVL_BROADCASTFRAGMENTS 35
19 : #define TLVL_BROADCASTFRAGMENTS_2 36
20 : #define TLVL_CHECKPENDINGBROADCASTS 37
21 : #define TLVL_CHECKPENDINGBUFFERS 38
22 : #define TLVL_CHECKPENDINGBUFFERS_2 39
23 : #define TLVL_CHECKPENDINGBUFFERS_3 40
24 : #define TLVL_CHECKPENDINGBUFFERS_4 41
25 : #define TLVL_COMPLETEBUFFER 42
26 : #define TLVL_CONSTRUCTOR 43
27 : #define TLVL_DESTRUCTOR 43
28 : #define TLVL_DONEWRITINGFRAGMENT 44
29 : #define TLVL_ENDOFDATA 45
30 : #define TLVL_ENDOFDATA_2 46
31 : #define TLVL_ENDRUN 47
32 : #define TLVL_GETBUFFER 48
33 : #define TLVL_GETFRAGMENTCOUNT 49
34 : #define TLVL_GETSUBRUN 50
35 : #define TLVL_GETSUBRUN_2 51
36 : #define TLVL_PARSEARTCOMMANDLINE 52
37 : #define TLVL_RECONFIGUREART 53
38 : #define TLVL_RUNART 54
39 : #define TLVL_RUNART_2 55
40 : #define TLVL_SENDINIT 56
41 : #define TLVL_SENDMETRICS 57
42 : #define TLVL_SHUTDOWN 58
43 : #define TLVL_STARTRUN 58
44 : #define TLVL_UPDATEARTCONFIG 59
45 : #define TLVL_WRITEFRAGMENTHEADER 60
46 : #define TLVL_BUFFER 61
47 : #define TLVL_BUFLCK 62
48 : // clang-format on
49 :
50 : std::mutex artdaq::SharedMemoryEventManager::sequence_id_mutex_;
51 : std::mutex artdaq::SharedMemoryEventManager::subrun_event_map_mutex_;
52 : const std::string artdaq::SharedMemoryEventManager::
53 : FRAGMENTS_RECEIVED_STAT_KEY("SharedMemoryEventManagerFragmentsReceived");
54 : const std::string artdaq::SharedMemoryEventManager::
55 : EVENTS_RELEASED_STAT_KEY("SharedMemoryEventManagerEventsReleased");
56 :
57 15 : artdaq::SharedMemoryEventManager::SharedMemoryEventManager(const fhicl::ParameterSet& pset, fhicl::ParameterSet art_pset)
58 30 : : SharedMemoryManager(pset.get<uint32_t>("shared_memory_key", Globals::SharedMemoryKey(0xEE000000)),
59 15 : pset.get<size_t>("buffer_count"),
60 75 : pset.has_key("max_event_size_bytes") ? pset.get<size_t>("max_event_size_bytes") : pset.get<size_t>("expected_fragments_per_event") * pset.get<size_t>("max_fragment_size_bytes"),
61 30 : pset.get<size_t>("stale_buffer_timeout_usec", pset.get<size_t>("event_queue_wait_time", 5) * 1000000),
62 30 : !pset.get<bool>("broadcast_mode", false))
63 30 : , num_art_processes_(pset.get<size_t>("art_analyzer_count", 1))
64 30 : , num_fragments_per_event_(pset.get<size_t>("expected_fragments_per_event"))
65 15 : , queue_size_(pset.get<size_t>("buffer_count"))
66 15 : , run_id_(0)
67 15 : , subrun_id_(0)
68 30 : , max_subrun_event_map_length_(pset.get<size_t>("max_subrun_lookup_table_size", 100))
69 30 : , subrun_transition_hold_time_s_(pset.get<double>("subrun_transition_hold_time_s", 0.001))
70 30 : , max_event_list_length_(pset.get<size_t>("max_event_list_length", 100))
71 30 : , update_run_ids_(pset.get<bool>("update_run_ids_on_new_fragment", true))
72 30 : , use_sequence_id_for_event_number_(pset.get<bool>("use_sequence_id_for_event_number", true))
73 50 : , overwrite_mode_(!pset.get<bool>("use_art", true) || pset.get<bool>("overwrite_mode", false) || pset.get<bool>("broadcast_mode", false))
74 60 : , init_fragment_count_(pset.get<size_t>("init_fragment_count", pset.get<bool>("send_init_fragments", true) ? 1 : 0))
75 15 : , running_(false)
76 15 : , buffer_writes_pending_()
77 60 : , open_event_report_interval_ms_(pset.get<int>("open_event_report_interval_ms", pset.get<int>("incomplete_event_report_interval_ms", -1)))
78 15 : , last_open_event_report_time_(std::chrono::steady_clock::now())
79 15 : , last_backpressure_report_time_(std::chrono::steady_clock::now())
80 15 : , last_fragment_header_write_time_(std::chrono::steady_clock::now())
81 45 : , event_timing_(pset.get<size_t>("buffer_count"))
82 30 : , broadcast_timeout_ms_(pset.get<int>("fragment_broadcast_timeout_ms", 3000))
83 15 : , run_event_count_(0)
84 15 : , run_incomplete_event_count_(0)
85 15 : , subrun_event_count_(0)
86 15 : , subrun_incomplete_event_count_(0)
87 15 : , oversize_fragment_count_(0)
88 45 : , maximum_oversize_fragment_count_(pset.get<int>("maximum_oversize_fragment_count", 1))
89 15 : , restart_art_(false)
90 30 : , always_restart_art_(pset.get<bool>("restart_crashed_art_processes", true))
91 30 : , manual_art_(pset.get<bool>("manual_art", false))
92 15 : , current_art_pset_(art_pset)
93 45 : , art_cmdline_(pset.get<std::string>("art_command_line", "art -c #CONFIG_FILE#"))
94 30 : , art_process_index_offset_(pset.get<size_t>("art_index_offset", 0))
95 30 : , minimum_art_lifetime_s_(pset.get<double>("minimum_art_lifetime_s", 2.0))
96 30 : , art_event_processing_time_us_(pset.get<size_t>("expected_art_event_processing_time_us", 1000000))
97 15 : , requests_(nullptr)
98 15 : , tokens_(nullptr)
99 15 : , data_pset_(pset)
100 60 : , broadcasts_(pset.get<uint32_t>("broadcast_shared_memory_key", Globals::SharedMemoryKey(0xBB000000)),
101 45 : pset.get<size_t>("broadcast_buffer_count", 10),
102 15 : pset.get<size_t>("broadcast_buffer_size", 0x100000),
103 345 : pset.get<int>("expected_art_event_processing_time_us", 100000) * pset.get<size_t>("buffer_count"), false)
104 : {
105 15 : subrun_event_map_[0] = 1;
106 15 : SetMinWriteSize(sizeof(detail::RawEventHeader) + sizeof(detail::RawFragmentHeader));
107 15 : broadcasts_.SetMinWriteSize(sizeof(detail::RawEventHeader) + sizeof(detail::RawFragmentHeader));
108 :
109 15 : RegisterWriter();
110 15 : broadcasts_.RegisterWriter();
111 :
112 45 : if (!pset.get<bool>("use_art", true))
113 : {
114 30 : TLOG(TLVL_INFO) << "BEGIN SharedMemoryEventManager CONSTRUCTOR with use_art:false";
115 10 : num_art_processes_ = 0;
116 : }
117 : else
118 : {
119 15 : TLOG(TLVL_INFO) << "BEGIN SharedMemoryEventManager CONSTRUCTOR with use_art:true";
120 10 : TLOG(TLVL_CONSTRUCTOR) << "art_pset is " << art_pset.to_string();
121 : }
122 :
123 15 : if (manual_art_)
124 0 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
125 : else
126 15 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
127 :
128 15 : if (overwrite_mode_ && num_art_processes_ > 0)
129 : {
130 0 : TLOG(TLVL_WARNING) << "Art is configured to run, but overwrite mode is enabled! Check your configuration if this in unintentional!";
131 0 : }
132 15 : else if (overwrite_mode_)
133 : {
134 30 : TLOG(TLVL_INFO) << "Overwrite Mode enabled, no configured art processes at startup";
135 : }
136 :
137 103 : for (size_t ii = 0; ii < size(); ++ii)
138 : {
139 88 : buffer_writes_pending_[ii] = 0;
140 : // Make sure the mutexes are created once
141 88 : std::lock_guard<std::mutex> lk(buffer_mutexes_[ii]);
142 88 : }
143 :
144 15 : if (!IsValid())
145 : {
146 0 : throw cet::exception(app_name + "_SharedMemoryEventManager") << "Unable to attach to Shared Memory!"; // NOLINT(cert-err60-cpp)
147 : }
148 :
149 30 : TLOG(TLVL_CONSTRUCTOR) << "Setting Writer rank to " << my_rank;
150 15 : SetRank(my_rank);
151 30 : TLOG(TLVL_CONSTRUCTOR) << "Writer Rank is " << GetRank();
152 :
153 15 : statsHelper_.addMonitoredQuantityName(FRAGMENTS_RECEIVED_STAT_KEY);
154 15 : statsHelper_.addMonitoredQuantityName(EVENTS_RELEASED_STAT_KEY);
155 :
156 : // fetch the monitoring parameters and create the MonitoredQuantity instances
157 15 : statsHelper_.createCollectors(pset, 100, 30.0, 60.0, EVENTS_RELEASED_STAT_KEY);
158 :
159 30 : TLOG(TLVL_CONSTRUCTOR) << "END CONSTRUCTOR";
160 15 : }
161 :
162 16 : artdaq::SharedMemoryEventManager::~SharedMemoryEventManager() noexcept
163 : {
164 30 : TLOG(TLVL_DESTRUCTOR) << "DESTRUCTOR";
165 15 : if (running_)
166 : {
167 : try
168 : {
169 8 : endOfData();
170 : }
171 0 : catch (...)
172 : {
173 : // IGNORED
174 0 : }
175 : }
176 :
177 15 : UnregisterWriter();
178 15 : broadcasts_.UnregisterWriter();
179 30 : TLOG(TLVL_DESTRUCTOR) << "Destructor END";
180 16 : }
181 :
182 2758 : bool artdaq::SharedMemoryEventManager::AddFragment(detail::RawFragmentHeader frag, void* dataPtr)
183 : {
184 2758 : if (!running_) return true;
185 :
186 5516 : TLOG(TLVL_ADDFRAGMENT) << "AddFragment(Header, ptr) BEGIN frag.word_count=" << frag.word_count
187 2758 : << ", sequence_id=" << frag.sequence_id;
188 2758 : auto buffer = getBufferForSequenceID_(frag.sequence_id, true, frag.timestamp);
189 5516 : TLOG(TLVL_ADDFRAGMENT) << "Using buffer " << buffer << " for seqid=" << frag.sequence_id;
190 2758 : if (buffer == -1)
191 : {
192 687 : return false;
193 : }
194 2071 : if (buffer == -2)
195 : {
196 0 : TLOG(TLVL_ERROR) << "Dropping event because data taking has already passed this event number: " << frag.sequence_id;
197 0 : return true;
198 : }
199 :
200 2071 : auto hdr = getEventHeader_(buffer);
201 2071 : if (update_run_ids_)
202 : {
203 2071 : hdr->run_id = run_id_;
204 2071 : hdr->subrun_id = subrun_id_;
205 : }
206 2071 : hdr->subrun_id = GetSubrunForSequenceID(frag.sequence_id);
207 :
208 4142 : TLOG(TLVL_ADDFRAGMENT) << "AddFragment before Write calls";
209 2071 : Write(buffer, dataPtr, frag.word_count * sizeof(RawDataType));
210 :
211 4142 : TLOG(TLVL_ADDFRAGMENT) << "Checking for complete event";
212 2071 : auto fragmentCount = GetFragmentCount(frag.sequence_id);
213 2071 : hdr->is_complete = fragmentCount == num_fragments_per_event_ && buffer_writes_pending_[buffer] == 0;
214 4142 : TLOG(TLVL_ADDFRAGMENT) << "hdr->is_complete=" << std::boolalpha << hdr->is_complete
215 0 : << ", fragmentCount=" << fragmentCount
216 0 : << ", num_fragments_per_event=" << num_fragments_per_event_
217 2071 : << ", buffer_writes_pending_[buffer]=" << buffer_writes_pending_[buffer];
218 :
219 2071 : complete_buffer_(buffer);
220 2071 : if (requests_)
221 : {
222 2071 : requests_->SendRequest(true);
223 : }
224 :
225 4142 : TLOG(TLVL_ADDFRAGMENT) << "AddFragment END";
226 2071 : statsHelper_.addSample(FRAGMENTS_RECEIVED_STAT_KEY, frag.word_count * sizeof(RawDataType));
227 2071 : return true;
228 : }
229 :
230 2071 : bool artdaq::SharedMemoryEventManager::AddFragment(FragmentPtr frag, size_t timeout_usec, FragmentPtr& outfrag)
231 : {
232 4142 : TLOG(TLVL_ADDFRAGMENT) << "AddFragment(FragmentPtr) BEGIN";
233 2071 : auto hdr = *reinterpret_cast<detail::RawFragmentHeader*>(frag->headerAddress()); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
234 2071 : auto data = frag->headerAddress();
235 2071 : auto start = std::chrono::steady_clock::now();
236 2071 : bool sts = false;
237 6900 : while (!sts && TimeUtils::GetElapsedTimeMicroseconds(start) < timeout_usec)
238 : {
239 2758 : sts = AddFragment(hdr, data);
240 2758 : if (!sts)
241 : {
242 687 : usleep(1000);
243 : }
244 : }
245 2071 : if (!sts)
246 : {
247 0 : outfrag = std::move(frag);
248 : }
249 4142 : TLOG(TLVL_ADDFRAGMENT) << "AddFragment(FragmentPtr) RETURN " << std::boolalpha << sts;
250 2071 : return sts;
251 : }
252 :
253 26 : artdaq::RawDataType* artdaq::SharedMemoryEventManager::WriteFragmentHeader(detail::RawFragmentHeader frag, bool dropIfNoBuffersAvailable)
254 : {
255 26 : if (!running_) return nullptr;
256 50 : TLOG(TLVL_WRITEFRAGMENTHEADER) << "WriteFragmentHeader BEGIN, seqID=" << frag.sequence_id;
257 25 : auto buffer = getBufferForSequenceID_(frag.sequence_id, true, frag.timestamp);
258 :
259 25 : if (buffer < 0)
260 : {
261 2 : if (buffer == -1 && !dropIfNoBuffersAvailable)
262 : {
263 0 : std::unique_lock<std::mutex> bp_lk(sequence_id_mutex_);
264 0 : if (TimeUtils::GetElapsedTime(last_backpressure_report_time_) > 1.0)
265 : {
266 0 : TLOG(TLVL_WARNING) << app_name << ": Back-pressure condition: All Shared Memory buffers have been full for " << TimeUtils::GetElapsedTime(last_fragment_header_write_time_) << " s!";
267 0 : last_backpressure_report_time_ = std::chrono::steady_clock::now();
268 : }
269 0 : if (metricMan)
270 : {
271 0 : metricMan->sendMetric("Back-pressure wait time", TimeUtils::GetElapsedTime(last_fragment_header_write_time_), "s", 1, MetricMode::LastPoint);
272 : }
273 0 : TLOG(TLVL_WRITEFRAGMENTHEADER) << "No shared memory buffers available, seqID=" << frag.sequence_id;
274 0 : return nullptr;
275 0 : }
276 2 : if (buffer == -2)
277 : {
278 6 : TLOG(TLVL_ERROR) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because data taking has already passed this event.";
279 : }
280 : else
281 : {
282 0 : TLOG(TLVL_INFO) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because there is no room in the queue and reliable mode is off.";
283 : }
284 2 : dropped_data_.emplace_back(frag, std::make_unique<Fragment>(frag.word_count - frag.num_words()));
285 2 : auto it = dropped_data_.rbegin();
286 :
287 4 : TLOG(TLVL_WRITEFRAGMENTHEADER) << "Dropping fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " into "
288 2 : << static_cast<void*>(it->second->dataBegin()) << " sz=" << it->second->dataSizeBytes();
289 :
290 2 : return it->second->dataBegin();
291 : }
292 :
293 23 : last_backpressure_report_time_ = std::chrono::steady_clock::now();
294 23 : last_fragment_header_write_time_ = std::chrono::steady_clock::now();
295 : // Increment this as soon as we know we want to use the buffer
296 23 : buffer_writes_pending_[buffer]++;
297 :
298 23 : if (metricMan)
299 : {
300 161 : metricMan->sendMetric("Input Fragment Rate", 1, "Fragments/s", 1, MetricMode::Rate);
301 : }
302 :
303 46 : TLOG(TLVL_BUFLCK) << "WriteFragmentHeader: obtaining buffer_mutexes lock for buffer " << buffer << ", seqID=" << frag.sequence_id;
304 : ;
305 :
306 23 : std::unique_lock<std::mutex> lk(buffer_mutexes_.at(buffer));
307 :
308 46 : TLOG(TLVL_BUFLCK) << "WriteFragmentHeader: obtained buffer_mutexes lock for buffer " << buffer << ", seqID=" << frag.sequence_id;
309 :
310 23 : auto hdrpos = reinterpret_cast<RawDataType*>(GetWritePos(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
311 23 : Write(buffer, &frag, frag.num_words() * sizeof(RawDataType));
312 :
313 23 : auto pos = reinterpret_cast<RawDataType*>(GetWritePos(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
314 23 : if (frag.word_count - frag.num_words() > 0)
315 : {
316 23 : auto sts = IncrementWritePos(buffer, (frag.word_count - frag.num_words()) * sizeof(RawDataType));
317 :
318 23 : if (!sts)
319 : {
320 0 : reinterpret_cast<detail::RawFragmentHeader*>(hdrpos)->word_count = frag.num_words(); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
321 0 : reinterpret_cast<detail::RawFragmentHeader*>(hdrpos)->type = Fragment::ErrorFragmentType; // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
322 0 : TLOG(TLVL_ERROR) << "Dropping over-size fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id << " because there is no room in the current buffer for this Fragment! (Keeping header)";
323 0 : dropped_data_.emplace_back(frag, std::make_unique<Fragment>(frag.word_count - frag.num_words()));
324 0 : auto it = dropped_data_.rbegin();
325 :
326 0 : oversize_fragment_count_++;
327 :
328 0 : if (maximum_oversize_fragment_count_ > 0 && oversize_fragment_count_ >= maximum_oversize_fragment_count_)
329 : {
330 0 : lk.unlock();
331 0 : throw cet::exception("Too many over-size Fragments received! Please adjust max_event_size_bytes or max_fragment_size_bytes!");
332 : }
333 :
334 0 : TLOG(TLVL_WRITEFRAGMENTHEADER) << "Dropping over-size fragment with sequence id " << frag.sequence_id << " and fragment id " << frag.fragment_id
335 0 : << " into " << static_cast<void*>(it->second->dataBegin());
336 0 : return it->second->dataBegin();
337 : }
338 : }
339 46 : TLOG(TLVL_WRITEFRAGMENTHEADER) << "WriteFragmentHeader END, seqID=" << frag.sequence_id;
340 23 : return pos;
341 23 : }
342 :
343 25 : void artdaq::SharedMemoryEventManager::DoneWritingFragment(detail::RawFragmentHeader frag)
344 : {
345 50 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment BEGIN";
346 :
347 25 : auto buffer = getBufferForSequenceID_(frag.sequence_id, false, frag.timestamp);
348 25 : if (buffer < 0)
349 : {
350 2 : for (auto it = dropped_data_.begin(); it != dropped_data_.end(); ++it)
351 : {
352 2 : if (frag.operator==(it->first)) // TODO, ELF 5/26/2023: Workaround until artdaq_core can be fixed for C++20
353 : {
354 2 : dropped_data_.erase(it);
355 2 : return;
356 : }
357 : }
358 0 : if (buffer == -1)
359 : {
360 0 : Detach(true, app_name + "SharedMemoryEventManager",
361 : "getBufferForSequenceID_ returned -1 in DoneWritingFragment. This indicates a possible mismatch between expected Fragment count and the actual number of Fragments received.");
362 : }
363 0 : return;
364 : }
365 :
366 23 : if (!frag.valid)
367 : {
368 0 : UpdateFragmentHeader(buffer, frag);
369 : }
370 :
371 23 : statsHelper_.addSample(FRAGMENTS_RECEIVED_STAT_KEY, frag.word_count * sizeof(RawDataType));
372 : {
373 46 : TLOG(TLVL_BUFLCK) << "DoneWritingFragment: obtaining buffer_mutexes lock for buffer " << buffer;
374 :
375 23 : std::unique_lock<std::mutex> lk(buffer_mutexes_.at(buffer));
376 :
377 46 : TLOG(TLVL_BUFLCK) << "DoneWritingFragment: obtained buffer_mutexes lock for buffer " << buffer;
378 :
379 46 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Received Fragment with sequence ID " << frag.sequence_id << " and fragment id " << frag.fragment_id << " (type " << static_cast<int>(frag.type) << ")";
380 23 : auto hdr = getEventHeader_(buffer);
381 23 : if (update_run_ids_)
382 : {
383 23 : hdr->run_id = run_id_;
384 23 : hdr->subrun_id = subrun_id_;
385 : }
386 23 : hdr->subrun_id = GetSubrunForSequenceID(frag.sequence_id);
387 :
388 46 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Updating buffer touch time";
389 23 : TouchBuffer(buffer);
390 :
391 23 : if (buffer_writes_pending_[buffer] > 1)
392 : {
393 6 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "Done writing fragment, but there's another writer. Not doing bookkeeping steps.";
394 3 : buffer_writes_pending_[buffer]--;
395 3 : return;
396 : }
397 40 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "Done writing fragment, and no other writer. Doing bookkeeping steps.";
398 20 : auto frag_count = GetFragmentCount(frag.sequence_id);
399 20 : hdr->is_complete = frag_count >= num_fragments_per_event_;
400 :
401 20 : if (frag_count > num_fragments_per_event_)
402 : {
403 3 : TLOG(TLVL_WARNING) << "DoneWritingFragment: This Event has more Fragments ( " << frag_count << " ) than specified in configuration ( " << num_fragments_per_event_ << " )!"
404 2 : << " This is probably due to a misconfiguration and is *not* a reliable mode!";
405 : }
406 :
407 40 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment: Received Fragment with sequence ID " << frag.sequence_id << " and fragment id " << frag.fragment_id << ", count/expected = " << frag_count << "/" << num_fragments_per_event_;
408 : #if ART_SUPPORTS_DUPLICATE_EVENTS
409 : if (!hdr->is_complete && released_incomplete_events_.count(frag.sequence_id))
410 : {
411 : hdr->is_complete = frag_count >= released_incomplete_events_[frag.sequence_id] && buffer_writes_pending_[buffer] == 0;
412 : }
413 : #endif
414 :
415 20 : complete_buffer_(buffer);
416 :
417 : // Move this down here to avoid race condition
418 20 : buffer_writes_pending_[buffer]--;
419 23 : }
420 20 : if (requests_)
421 : {
422 20 : requests_->SendRequest(true);
423 : }
424 40 : TLOG(TLVL_DONEWRITINGFRAGMENT) << "DoneWritingFragment END";
425 : }
426 :
427 2101 : size_t artdaq::SharedMemoryEventManager::GetFragmentCount(Fragment::sequence_id_t seqID, Fragment::type_t type)
428 : {
429 2101 : return GetFragmentCountInBuffer(getBufferForSequenceID_(seqID, false), type);
430 : }
431 :
432 2102 : size_t artdaq::SharedMemoryEventManager::GetFragmentCountInBuffer(int buffer, Fragment::type_t type)
433 : {
434 2102 : if (buffer < 0)
435 : {
436 0 : return 0;
437 : }
438 2102 : ResetReadPos(buffer);
439 2102 : IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
440 :
441 2102 : size_t count = 0;
442 :
443 8318 : while (MoreDataInBuffer(buffer))
444 : {
445 6216 : auto fragHdr = reinterpret_cast<artdaq::detail::RawFragmentHeader*>(GetReadPos(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
446 6216 : IncrementReadPos(buffer, fragHdr->word_count * sizeof(RawDataType));
447 6216 : if (type != Fragment::InvalidFragmentType && fragHdr->type != type)
448 : {
449 : // Skip fragments with the wrong type, as they were over-size and truncated to the header
450 0 : continue;
451 : }
452 12432 : TLOG(TLVL_GETFRAGMENTCOUNT) << "Adding Fragment with size=" << fragHdr->word_count << " to Fragment count";
453 6216 : ++count;
454 : }
455 :
456 2102 : return count;
457 : }
458 :
459 0 : void artdaq::SharedMemoryEventManager::UpdateFragmentHeader(int buffer, artdaq::detail::RawFragmentHeader hdr)
460 : {
461 0 : if (buffer < 0)
462 : {
463 0 : return;
464 : }
465 0 : ResetReadPos(buffer);
466 0 : IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
467 :
468 0 : while (MoreDataInBuffer(buffer))
469 : {
470 0 : auto fragHdr = reinterpret_cast<artdaq::detail::RawFragmentHeader*>(GetReadPos(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
471 0 : if (hdr.fragment_id == fragHdr->fragment_id)
472 : {
473 0 : *fragHdr = hdr;
474 0 : break;
475 : }
476 : }
477 :
478 0 : return;
479 : }
480 :
481 6 : void artdaq::SharedMemoryEventManager::RunArt(size_t process_index, const std::shared_ptr<std::atomic<pid_t>>& pid_out)
482 : {
483 : do
484 : {
485 6 : auto start_time = std::chrono::steady_clock::now();
486 6 : send_init_frags_();
487 18 : TLOG(TLVL_INFO) << "Starting art process with config file " << current_art_config_file_->getFileName();
488 :
489 6 : pid_t pid = 0;
490 :
491 6 : if (!manual_art_)
492 : {
493 6 : pid = fork();
494 6 : if (pid == 0)
495 : { /* child */
496 : // 23-May-2018, KAB: added the setting of the partition number env var
497 : // in the environment of the child art process so that Globals.hh
498 : // will pick it up there and provide it to the artdaq classes that
499 : // are used in data transfers, etc. within the art process.
500 0 : std::string envVarKey = "ARTDAQ_PARTITION_NUMBER";
501 0 : std::string envVarValue = std::to_string(Globals::GetPartitionNumber());
502 0 : if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
503 : {
504 0 : TLOG(TLVL_ERROR) << "Error setting environment variable \"" << envVarKey
505 0 : << "\" in the environment of a child art process. "
506 0 : << "This may result in incorrect TCP port number "
507 0 : << "assignments or other issues, and data may "
508 0 : << "not flow through the system correctly.";
509 : }
510 0 : envVarKey = "ARTDAQ_APPLICATION_NAME";
511 0 : envVarValue = app_name;
512 0 : if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
513 : {
514 0 : TLOG(TLVL_RUNART) << "Error setting environment variable \"" << envVarKey
515 0 : << "\" in the environment of a child art process. ";
516 : }
517 0 : envVarKey = "ARTDAQ_RANK";
518 0 : envVarValue = std::to_string(my_rank);
519 0 : if (setenv(envVarKey.c_str(), envVarValue.c_str(), 1) != 0)
520 : {
521 0 : TLOG(TLVL_RUNART) << "Error setting environment variable \"" << envVarKey
522 0 : << "\" in the environment of a child art process. ";
523 : }
524 :
525 0 : TLOG(TLVL_RUNART_2) << "Parsing art command line";
526 0 : auto args = parse_art_command_line_(current_art_config_file_, process_index);
527 :
528 0 : TLOG(TLVL_RUNART_2) << "Calling execvp with application name " << args[0];
529 0 : execvp(args[0], &args[0]);
530 :
531 0 : TLOG(TLVL_RUNART_2) << "Application exited, cleaning up";
532 0 : for (auto& arg : args)
533 : {
534 0 : delete[] arg;
535 : }
536 :
537 0 : exit(1);
538 0 : }
539 : }
540 : else
541 : {
542 : // Using cin/cout here to ensure console is active (artdaqDriver)
543 0 : std::cout << "Please run the following command in a separate terminal:" << std::endl
544 0 : << "art -c " << current_art_config_file_->getFileName() << std::endl
545 0 : << "Then, in a third terminal, execute: \"ps aux|grep [a]rt -c " << current_art_config_file_->getFileName() << "\" and note the PID of the art process." << std::endl
546 0 : << "Finally, return to this window and enter the pid: " << std::endl;
547 0 : std::cin >> pid;
548 : }
549 6 : *pid_out = pid;
550 :
551 18 : TLOG(TLVL_INFO) << "PID of new art process is " << pid;
552 : {
553 6 : std::unique_lock<std::mutex> lk(art_process_mutex_);
554 6 : art_processes_.insert(pid);
555 6 : }
556 : siginfo_t status;
557 6 : auto sts = 0;
558 6 : if (!manual_art_)
559 : {
560 6 : sts = waitid(P_PID, pid, &status, WEXITED);
561 : }
562 : else
563 : {
564 0 : while (kill(pid, 0) >= 0) usleep(10000);
565 :
566 0 : TLOG(TLVL_INFO) << "Faking good exit status, please see art process for actual exit status!";
567 0 : status.si_code = CLD_EXITED;
568 0 : status.si_status = 0;
569 : }
570 18 : TLOG(TLVL_INFO) << "Removing PID " << pid << " from process list";
571 : {
572 6 : std::unique_lock<std::mutex> lk(art_process_mutex_);
573 6 : art_processes_.erase(pid);
574 6 : }
575 6 : if (sts < 0)
576 : {
577 0 : TLOG(TLVL_WARNING) << "Error occurred in waitid for art process " << pid << ": " << errno << " (" << strerror(errno) << ").";
578 : }
579 6 : else if (status.si_code == CLD_EXITED && status.si_status == 0)
580 : {
581 12 : TLOG(TLVL_INFO) << "art process " << pid << " exited normally, " << (restart_art_ ? "restarting" : "not restarting");
582 4 : }
583 : else
584 : {
585 2 : auto art_lifetime = TimeUtils::GetElapsedTime(start_time);
586 2 : if (art_lifetime < minimum_art_lifetime_s_)
587 : {
588 2 : restart_art_ = false;
589 : }
590 :
591 2 : auto exit_type = "exited with status code";
592 2 : switch (status.si_code)
593 : {
594 0 : case CLD_DUMPED:
595 : case CLD_KILLED:
596 0 : exit_type = "was killed with signal";
597 0 : break;
598 2 : case CLD_EXITED:
599 : default:
600 2 : break;
601 : }
602 :
603 6 : TLOG((restart_art_ ? TLVL_WARNING : TLVL_ERROR))
604 2 : << "art process " << pid << " " << exit_type << " " << status.si_status
605 2 : << (status.si_code == CLD_DUMPED ? " (core dumped)" : "")
606 2 : << " after running for " << std::setprecision(2) << std::fixed << art_lifetime << " seconds, "
607 4 : << (restart_art_ ? "restarting" : "not restarting");
608 : }
609 6 : } while (restart_art_);
610 6 : }
611 :
612 17 : void artdaq::SharedMemoryEventManager::StartArt()
613 : {
614 17 : size_t initialCount = GetAttachedCount();
615 17 : restart_art_ = always_restart_art_;
616 17 : if (num_art_processes_ == 0)
617 : {
618 11 : return;
619 : }
620 12 : for (size_t ii = 0; ii < num_art_processes_; ++ii)
621 : {
622 6 : StartArtProcess(current_art_pset_, ii);
623 : }
624 6 : auto startTime = std::chrono::steady_clock::now();
625 6 : while (GetAttachedCount() - initialCount != num_art_processes_)
626 : {
627 0 : TLOG(TLVL_INFO) << "Waiting for all art processes to connect to shared memory, " << TimeUtils::GetElapsedTime(startTime) << " s elapsed.";
628 0 : std::this_thread::sleep_for(std::chrono::seconds(1));
629 : }
630 : }
631 :
632 6 : pid_t artdaq::SharedMemoryEventManager::StartArtProcess(fhicl::ParameterSet pset, size_t process_index)
633 : {
634 : static std::mutex start_art_mutex;
635 6 : std::unique_lock<std::mutex> lk(start_art_mutex);
636 : // TraceLock lk(start_art_mutex, 15, "StartArtLock");
637 6 : restart_art_ = always_restart_art_;
638 6 : auto initialCount = GetAttachedCount();
639 6 : auto startTime = std::chrono::steady_clock::now();
640 :
641 6 : if (pset != current_art_pset_ || !current_art_config_file_)
642 : {
643 0 : current_art_pset_ = pset;
644 0 : if (manual_art_)
645 0 : current_art_config_file_ = std::make_shared<art_config_file>(pset, GetKey(), GetBroadcastKey());
646 : else
647 0 : current_art_config_file_ = std::make_shared<art_config_file>(pset);
648 : }
649 6 : std::shared_ptr<std::atomic<pid_t>> pid(new std::atomic<pid_t>(-1));
650 12 : boost::thread thread([this, process_index, pid] { RunArt(process_index, pid); });
651 6 : thread.detach();
652 :
653 6 : auto currentCount = GetAttachedCount() - initialCount;
654 37 : while ((currentCount < 1 || *pid <= 0) && (TimeUtils::GetElapsedTime(startTime) < 5 || manual_art_))
655 : {
656 31 : usleep(10000);
657 31 : currentCount = GetAttachedCount() - initialCount;
658 : }
659 6 : if ((currentCount < 1 || *pid <= 0) && manual_art_)
660 : {
661 0 : TLOG(TLVL_WARNING) << "Manually-started art process has not connected to shared memory or has bad PID: connected:" << currentCount << ", PID:" << pid;
662 0 : return 0;
663 : }
664 6 : if (currentCount < 1 || *pid <= 0)
665 : {
666 0 : TLOG(TLVL_WARNING) << "art process has not started after 5s. Check art configuration!"
667 0 : << " (pid=" << *pid << ", attachedCount=" << currentCount << ")";
668 0 : return 0;
669 : }
670 :
671 18 : TLOG(TLVL_INFO) << std::setw(4) << std::fixed << "art initialization took "
672 12 : << TimeUtils::GetElapsedTime(startTime) << " seconds.";
673 :
674 6 : return *pid;
675 6 : }
676 :
677 4 : void artdaq::SharedMemoryEventManager::ShutdownArtProcesses(std::set<pid_t>& pids)
678 : {
679 4 : restart_art_ = false;
680 : // current_art_config_file_ = nullptr;
681 : // current_art_pset_ = fhicl::ParameterSet();
682 :
683 349 : auto check_pids = [&](bool print) {
684 349 : std::unique_lock<std::mutex> lk(art_process_mutex_);
685 694 : for (auto pid = pids.begin(); pid != pids.end();)
686 : {
687 : // 08-May-2018, KAB: protect against killing invalid PIDS
688 :
689 345 : if (*pid <= 0)
690 : {
691 0 : TLOG(TLVL_WARNING) << "Removing an invalid PID (" << *pid
692 0 : << ") from the shutdown list.";
693 0 : pid = pids.erase(pid);
694 : }
695 345 : else if (kill(*pid, 0) < 0)
696 : {
697 0 : pid = pids.erase(pid);
698 : }
699 : else
700 : {
701 345 : if (print)
702 : {
703 0 : std::cout << *pid << " ";
704 : }
705 345 : ++pid;
706 : }
707 : }
708 349 : };
709 349 : auto count_pids = [&]() {
710 349 : std::unique_lock<std::mutex> lk(art_process_mutex_);
711 698 : return pids.size();
712 349 : };
713 4 : check_pids(false);
714 4 : if (count_pids() == 0)
715 : {
716 0 : TLOG(TLVL_SHUTDOWN) << "All art processes already exited, nothing to do.";
717 0 : usleep(1000);
718 4 : return;
719 : }
720 :
721 4 : if (!manual_art_)
722 : {
723 4 : int graceful_wait_ms = art_event_processing_time_us_ * size() * 10 / 1000;
724 4 : int gentle_wait_ms = art_event_processing_time_us_ * size() * 2 / 1000;
725 4 : int int_wait_ms = art_event_processing_time_us_ * size() / 1000;
726 4 : auto shutdown_start = std::chrono::steady_clock::now();
727 :
728 : // if (!overwrite_mode_)
729 : {
730 8 : TLOG(TLVL_SHUTDOWN) << "Waiting up to " << graceful_wait_ms << " ms for all art processes to exit gracefully";
731 345 : for (int ii = 0; ii < graceful_wait_ms; ++ii)
732 : {
733 345 : usleep(1000);
734 :
735 345 : check_pids(false);
736 345 : if (count_pids() == 0)
737 : {
738 12 : TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms.";
739 4 : return;
740 : }
741 : }
742 : }
743 :
744 : {
745 0 : TLOG(TLVL_SHUTDOWN) << "Gently informing art processes that it is time to shut down";
746 0 : std::unique_lock<std::mutex> lk(art_process_mutex_);
747 0 : for (auto pid : pids)
748 : {
749 0 : TLOG(TLVL_SHUTDOWN) << "Sending SIGQUIT to pid " << pid;
750 0 : kill(pid, SIGQUIT);
751 : }
752 0 : }
753 :
754 0 : TLOG(TLVL_SHUTDOWN) << "Waiting up to " << gentle_wait_ms << " ms for all art processes to exit from SIGQUIT";
755 0 : for (int ii = 0; ii < gentle_wait_ms; ++ii)
756 : {
757 0 : usleep(1000);
758 :
759 0 : check_pids(false);
760 0 : if (count_pids() == 0)
761 : {
762 0 : TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGQUIT).";
763 0 : return;
764 : }
765 : }
766 :
767 : {
768 0 : TLOG(TLVL_SHUTDOWN) << "Insisting that the art processes shut down";
769 0 : std::unique_lock<std::mutex> lk(art_process_mutex_);
770 0 : for (auto pid : pids)
771 : {
772 0 : kill(pid, SIGINT);
773 : }
774 0 : }
775 :
776 0 : TLOG(TLVL_SHUTDOWN) << "Waiting up to " << int_wait_ms << " ms for all art processes to exit from SIGINT";
777 0 : for (int ii = 0; ii < int_wait_ms; ++ii)
778 : {
779 0 : usleep(1000);
780 :
781 0 : check_pids(false);
782 :
783 0 : if (count_pids() == 0)
784 : {
785 0 : TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGINT).";
786 0 : return;
787 : }
788 : }
789 :
790 0 : TLOG(TLVL_SHUTDOWN) << "Killing remaning art processes with extreme prejudice";
791 0 : while (count_pids() > 0)
792 : {
793 : {
794 0 : std::unique_lock<std::mutex> lk(art_process_mutex_);
795 0 : kill(*pids.begin(), SIGKILL);
796 0 : usleep(1000);
797 0 : }
798 0 : check_pids(false);
799 : }
800 0 : TLOG(TLVL_INFO) << "All art processes exited after " << TimeUtils::GetElapsedTimeMilliseconds(shutdown_start) << " ms (SIGKILL).";
801 : }
802 : else
803 : {
804 0 : std::cout << "Please shut down all art processes, then hit return/enter" << std::endl;
805 0 : while (count_pids() > 0)
806 : {
807 0 : std::cout << "The following PIDs are running: ";
808 0 : check_pids(true);
809 0 : std::cout << std::endl;
810 0 : usleep(500000);
811 : }
812 : }
813 : }
814 :
815 1 : void artdaq::SharedMemoryEventManager::ReconfigureArt(fhicl::ParameterSet art_pset, run_id_t newRun, int n_art_processes)
816 : {
817 2 : TLOG(TLVL_RECONFIGUREART) << "ReconfigureArt BEGIN";
818 1 : if (restart_art_ || !always_restart_art_) // Art is running
819 : {
820 0 : endOfData();
821 : }
822 11 : for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
823 : {
824 10 : broadcasts_.MarkBufferEmpty(ii, true);
825 : }
826 1 : if (newRun == 0)
827 : {
828 1 : newRun = run_id_ + 1;
829 : }
830 :
831 1 : if (art_pset != current_art_pset_ || !current_art_config_file_)
832 : {
833 1 : current_art_pset_ = art_pset;
834 1 : if (manual_art_)
835 0 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
836 : else
837 1 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
838 : }
839 :
840 1 : if (n_art_processes != -1)
841 : {
842 0 : TLOG(TLVL_INFO) << "Setting number of art processes to " << n_art_processes;
843 0 : num_art_processes_ = n_art_processes;
844 : }
845 1 : startRun(newRun);
846 2 : TLOG(TLVL_RECONFIGUREART) << "ReconfigureArt END";
847 1 : }
848 :
849 16 : bool artdaq::SharedMemoryEventManager::endOfData()
850 : {
851 16 : running_ = false;
852 : {
853 16 : std::lock_guard<std::mutex> lk(init_fragments_mutex_);
854 16 : init_fragment_map_.clear();
855 16 : }
856 16 : init_frags_sent_ = false;
857 32 : TLOG(TLVL_ENDOFDATA) << "SharedMemoryEventManager::endOfData";
858 16 : restart_art_ = false;
859 :
860 16 : auto start = std::chrono::steady_clock::now();
861 212 : auto pendingWriteCount = std::accumulate(buffer_writes_pending_.begin(), buffer_writes_pending_.end(), 0, [](int a, auto& b) { return a + b.second.load(); });
862 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: Waiting for " << pendingWriteCount << " pending writes to complete";
863 26 : while (pendingWriteCount > 0 && TimeUtils::GetElapsedTimeMicroseconds(start) < 1000000)
864 : {
865 10 : usleep(10000);
866 50 : pendingWriteCount = std::accumulate(buffer_writes_pending_.begin(), buffer_writes_pending_.end(), 0, [](int a, auto& b) { return a + b.second.load(); });
867 : }
868 :
869 16 : size_t initialStoreSize = GetOpenEventCount();
870 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: Flushing " << initialStoreSize
871 16 : << " stale events from the SharedMemoryEventManager.";
872 16 : int counter = initialStoreSize;
873 20 : while (!active_buffers_.empty() && counter > 0)
874 : {
875 4 : complete_buffer_(*active_buffers_.begin());
876 4 : counter--;
877 : }
878 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: Done flushing, there are now " << GetOpenEventCount()
879 16 : << " stale events in the SharedMemoryEventManager.";
880 :
881 32 : TLOG(TLVL_ENDOFDATA) << "Waiting for " << (ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_))) << " outstanding buffers...";
882 16 : start = std::chrono::steady_clock::now();
883 16 : auto lastReadCount = ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_));
884 16 : auto end_of_data_wait_us = art_event_processing_time_us_ * (lastReadCount > 0 ? lastReadCount : 1); // size();
885 :
886 16 : auto outstanding_buffer_wait_time = art_event_processing_time_us_ > 100000 ? 100000 : art_event_processing_time_us_;
887 :
888 : // We will wait until no buffer has been read for the end of data wait seconds, or no art processes are left.
889 30 : while (lastReadCount > 0 && (end_of_data_wait_us == 0 || TimeUtils::GetElapsedTimeMicroseconds(start) < end_of_data_wait_us) && get_art_process_count_() > 0)
890 : {
891 14 : auto temp = ReadReadyCount() + (size() - WriteReadyCount(overwrite_mode_));
892 14 : if (temp != lastReadCount)
893 : {
894 8 : TLOG(TLVL_ENDOFDATA_2) << "Waiting for " << temp << " outstanding buffers...";
895 4 : lastReadCount = temp;
896 4 : start = std::chrono::steady_clock::now();
897 : }
898 14 : if (lastReadCount > 0)
899 : {
900 20 : TLOG(TLVL_ENDOFDATA_2) << "About to sleep " << outstanding_buffer_wait_time << " us - lastReadCount=" << lastReadCount << " size=" << size() << " end_of_data_wait_us=" << end_of_data_wait_us;
901 10 : usleep(outstanding_buffer_wait_time);
902 : }
903 : }
904 :
905 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: After wait for outstanding buffers. Still outstanding: " << lastReadCount << ", time waited: "
906 16 : << TimeUtils::GetElapsedTime(start) << " s / " << (end_of_data_wait_us / 1000000.0) << " s, art process count: " << get_art_process_count_();
907 :
908 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: Broadcasting EndOfData Fragment";
909 16 : FragmentPtrs broadcast;
910 16 : broadcast.emplace_back(Fragment::eodFrag(GetBufferCount()));
911 16 : bool success = broadcastFragments_(broadcast);
912 16 : if (!success)
913 : {
914 0 : TLOG(TLVL_ENDOFDATA) << "endOfData: Clearing buffers to make room for EndOfData Fragment";
915 0 : for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
916 : {
917 0 : broadcasts_.MarkBufferEmpty(ii, true);
918 : }
919 0 : broadcastFragments_(broadcast);
920 : }
921 16 : auto endOfDataProcessingStart = std::chrono::steady_clock::now();
922 20 : while (get_art_process_count_() > 0)
923 : {
924 8 : TLOG(TLVL_ENDOFDATA) << "There are " << get_art_process_count_() << " art processes remaining. Proceeding to shutdown.";
925 :
926 4 : ShutdownArtProcesses(art_processes_);
927 : }
928 32 : TLOG(TLVL_ENDOFDATA) << "It took " << TimeUtils::GetElapsedTime(endOfDataProcessingStart) << " s for all art processes to close after sending EndOfData Fragment";
929 :
930 16 : ResetAttachedCount();
931 :
932 32 : TLOG(TLVL_ENDOFDATA) << "endOfData: Clearing buffers";
933 114 : for (size_t ii = 0; ii < size(); ++ii)
934 : {
935 98 : MarkBufferEmpty(ii, true);
936 : }
937 :
938 16 : released_events_.clear();
939 16 : released_incomplete_events_.clear();
940 :
941 32 : TLOG(TLVL_ENDOFDATA) << "endOfData END";
942 48 : TLOG(TLVL_INFO) << "EndOfData Complete. There were " << GetLastSeenBufferID() << " buffers processed.";
943 16 : return true;
944 16 : }
945 :
946 17 : void artdaq::SharedMemoryEventManager::startRun(run_id_t runID)
947 : {
948 17 : running_ = true;
949 : {
950 17 : std::lock_guard<std::mutex> lk(init_fragments_mutex_);
951 17 : init_fragment_map_.clear();
952 17 : }
953 17 : init_frags_sent_ = false;
954 17 : statsHelper_.resetStatistics();
955 34 : TLOG(TLVL_STARTRUN) << "startRun: Clearing broadcast buffers";
956 187 : for (size_t ii = 0; ii < broadcasts_.size(); ++ii)
957 : {
958 170 : broadcasts_.MarkBufferEmpty(ii, true);
959 : }
960 17 : released_events_.clear();
961 17 : released_incomplete_events_.clear();
962 17 : StartArt();
963 17 : run_id_ = runID;
964 17 : subrun_id_ = 1;
965 : {
966 17 : std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
967 17 : subrun_event_map_.clear();
968 17 : subrun_event_map_[0] = 1;
969 17 : }
970 17 : run_event_count_ = 0;
971 17 : run_incomplete_event_count_ = 0;
972 17 : requests_ = std::make_unique<RequestSender>(data_pset_);
973 17 : if (requests_)
974 : {
975 17 : requests_->SetRunNumber(static_cast<uint32_t>(run_id_));
976 : }
977 51 : if (data_pset_.has_key("routing_token_config"))
978 : {
979 0 : auto rmPset = data_pset_.get<fhicl::ParameterSet>("routing_token_config");
980 0 : if (rmPset.get<bool>("use_routing_manager", false))
981 : {
982 0 : tokens_ = std::make_unique<TokenSender>(rmPset);
983 0 : tokens_->SetRunNumber(static_cast<uint32_t>(run_id_));
984 0 : tokens_->SendRoutingToken(queue_size_, run_id_);
985 : }
986 0 : }
987 34 : TLOG(TLVL_STARTRUN) << "Starting run " << run_id_
988 0 : << ", max queue size = "
989 0 : << queue_size_
990 0 : << ", queue size = "
991 17 : << GetLockedBufferCount();
992 17 : if (metricMan)
993 : {
994 119 : metricMan->sendMetric("Run Number", static_cast<uint64_t>(run_id_), "Run", 1, MetricMode::LastPoint | MetricMode::Persist);
995 : }
996 17 : }
997 :
998 1 : bool artdaq::SharedMemoryEventManager::endRun()
999 : {
1000 3 : TLOG(TLVL_INFO) << "Ending run " << run_id_;
1001 2 : TLOG(TLVL_ENDRUN) << "Shutting down RequestSender";
1002 1 : requests_.reset(nullptr);
1003 2 : TLOG(TLVL_ENDRUN) << "Shutting down TokenSender";
1004 1 : tokens_.reset(nullptr);
1005 :
1006 3 : TLOG(TLVL_INFO) << "Run " << run_id_ << " has ended. There were " << run_event_count_ << " events in this run.";
1007 1 : run_event_count_ = 0;
1008 1 : run_incomplete_event_count_ = 0;
1009 1 : oversize_fragment_count_ = 0;
1010 : {
1011 1 : std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
1012 1 : subrun_event_map_.clear();
1013 1 : subrun_event_map_[0] = 1;
1014 1 : }
1015 1 : return true;
1016 : }
1017 :
1018 5 : void artdaq::SharedMemoryEventManager::rolloverSubrun(sequence_id_t boundary, subrun_id_t subrun, bool sendFragment)
1019 : {
1020 : // Generated EndOfSubrun Fragments have Sequence ID 0 and should be ignored
1021 5 : if (boundary == 0 || boundary == Fragment::InvalidSequenceID)
1022 : {
1023 2 : return;
1024 : }
1025 :
1026 5 : std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
1027 :
1028 : // Don't re-rollover to an already-defined subrun
1029 5 : if (!subrun_event_map_.empty() && subrun_event_map_.rbegin()->second >= subrun)
1030 : {
1031 2 : return;
1032 : }
1033 9 : TLOG(TLVL_INFO) << "Will roll over to subrun " << subrun << " when I reach Sequence ID " << (boundary + 1);
1034 3 : subrun_event_map_[boundary + 1] = subrun;
1035 3 : while (subrun_event_map_.size() > max_subrun_event_map_length_)
1036 : {
1037 0 : subrun_event_map_.erase(subrun_event_map_.begin());
1038 : }
1039 :
1040 3 : if (sendFragment)
1041 : {
1042 0 : auto endOfSubrunFrag = artdaq::MetadataFragment::CreateEndOfSubrunFragment(my_rank, boundary, subrun, 0);
1043 0 : BroadcastFragment(endOfSubrunFrag);
1044 0 : }
1045 5 : }
1046 :
1047 2 : void artdaq::SharedMemoryEventManager::rolloverSubrun(bool sendFragment)
1048 : {
1049 2 : Fragment::sequence_id_t seqID = 0;
1050 2 : subrun_id_t subrun = 0;
1051 : {
1052 2 : std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
1053 5 : for (auto& it : subrun_event_map_)
1054 : {
1055 3 : if (it.first >= seqID)
1056 : {
1057 3 : seqID = it.first + 1;
1058 : }
1059 3 : if (it.second >= subrun)
1060 : {
1061 3 : subrun = it.second + 1;
1062 : }
1063 : }
1064 2 : }
1065 2 : rolloverSubrun(seqID, subrun, sendFragment);
1066 2 : }
1067 :
1068 0 : void artdaq::SharedMemoryEventManager::sendMetrics()
1069 : {
1070 0 : if (metricMan)
1071 : {
1072 0 : metricMan->sendMetric("Open Event Count", GetOpenEventCount(), "events", 1, MetricMode::LastPoint);
1073 0 : metricMan->sendMetric("Pending Event Count", GetPendingEventCount(), "events", 1, MetricMode::LastPoint);
1074 : }
1075 :
1076 0 : if (open_event_report_interval_ms_ > 0 && GetLockedBufferCount() != 0u)
1077 : {
1078 0 : if (TimeUtils::GetElapsedTimeMilliseconds(last_open_event_report_time_) < static_cast<size_t>(open_event_report_interval_ms_))
1079 : {
1080 0 : return;
1081 : }
1082 :
1083 0 : last_open_event_report_time_ = std::chrono::steady_clock::now();
1084 0 : std::ostringstream oss;
1085 0 : oss << "Open Events (expecting " << num_fragments_per_event_ << " Fragments): ";
1086 0 : for (auto& ev : active_buffers_)
1087 : {
1088 0 : auto hdr = getEventHeader_(ev);
1089 0 : oss << hdr->sequence_id << " (has " << GetFragmentCount(hdr->sequence_id) << " Fragments), ";
1090 : }
1091 0 : TLOG(TLVL_SENDMETRICS) << oss.str();
1092 0 : }
1093 : }
1094 :
1095 8862 : artdaq::detail::RawEventHeader* artdaq::SharedMemoryEventManager::getEventHeader_(int buffer)
1096 : {
1097 8862 : return reinterpret_cast<detail::RawEventHeader*>(GetBufferStart(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
1098 : }
1099 :
1100 2565 : artdaq::SharedMemoryEventManager::subrun_id_t artdaq::SharedMemoryEventManager::GetSubrunForSequenceID(Fragment::sequence_id_t seqID)
1101 : {
1102 2565 : subrun_id_t subrun = 1;
1103 2565 : if (init_fragment_count_ > 0)
1104 : {
1105 6 : TLOG(TLVL_GETSUBRUN_2) << "init_fragment_count > 0 (processing art events): Decoding subrun from sequenceID " << seqID;
1106 3 : subrun = seqID >> 32;
1107 : }
1108 : else
1109 : {
1110 2562 : std::unique_lock<std::mutex> lk(subrun_event_map_mutex_);
1111 :
1112 5124 : TLOG(TLVL_GETSUBRUN_2) << "GetSubrunForSequenceID BEGIN map size = " << subrun_event_map_.size();
1113 2562 : auto it = subrun_event_map_.begin();
1114 :
1115 5127 : while (it->first <= seqID && it != subrun_event_map_.end())
1116 : {
1117 5130 : TLOG(TLVL_GETSUBRUN_2) << "Map has sequence ID " << it->first << ", subrun " << it->second << " (looking for <= " << seqID << ")";
1118 2565 : subrun = it->second;
1119 2565 : ++it;
1120 : }
1121 2562 : }
1122 :
1123 5130 : TLOG(TLVL_GETSUBRUN) << "GetSubrunForSequenceID returning subrun " << subrun << " for sequence ID " << seqID;
1124 2565 : return subrun;
1125 : }
1126 :
1127 4909 : int artdaq::SharedMemoryEventManager::getBufferForSequenceID_(Fragment::sequence_id_t seqID, bool create_new, Fragment::timestamp_t timestamp)
1128 : {
1129 9818 : TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " BEGIN";
1130 4909 : std::unique_lock<std::mutex> lk(sequence_id_mutex_);
1131 :
1132 9818 : TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID obtained sequence_id_mutex for seqid=" << seqID;
1133 :
1134 4909 : auto buffers = GetBuffersOwnedByManager();
1135 4924 : for (auto& buf : buffers)
1136 : {
1137 3790 : auto hdr = getEventHeader_(buf);
1138 3790 : if (hdr->sequence_id == seqID)
1139 : {
1140 7550 : TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " returning " << buf;
1141 3775 : return buf;
1142 : }
1143 : }
1144 :
1145 : #if !ART_SUPPORTS_DUPLICATE_EVENTS
1146 1134 : if (released_incomplete_events_.count(seqID) != 0u)
1147 : {
1148 6 : TLOG(TLVL_ERROR) << "Event " << seqID << " has already been marked \"Incomplete\" and sent to art!";
1149 2 : return -2;
1150 : }
1151 1132 : if (released_events_.count(seqID) != 0u)
1152 : {
1153 6 : TLOG(TLVL_ERROR) << "Event " << seqID << " has already been completed and released to art! Check configuration for inconsistent Fragment count per event!";
1154 2 : return -2;
1155 : }
1156 : #endif
1157 :
1158 1130 : if (!create_new)
1159 : {
1160 0 : return -1;
1161 : }
1162 :
1163 1130 : check_pending_buffers_(lk);
1164 1130 : int new_buffer = GetBufferForWriting(false);
1165 :
1166 1130 : if (new_buffer == -1)
1167 : {
1168 687 : new_buffer = GetBufferForWriting(overwrite_mode_);
1169 : }
1170 :
1171 1130 : if (new_buffer == -1)
1172 : {
1173 687 : return -1;
1174 : }
1175 886 : TLOG(TLVL_BUFLCK) << "getBufferForSequenceID_: obtaining buffer_mutexes lock for buffer " << new_buffer;
1176 443 : std::unique_lock<std::mutex> buffer_lk(buffer_mutexes_.at(new_buffer));
1177 886 : TLOG(TLVL_BUFLCK) << "getBufferForSequenceID_: obtained buffer_mutexes lock for buffer " << new_buffer;
1178 :
1179 443 : event_timing_[new_buffer] = std::chrono::steady_clock::now();
1180 :
1181 443 : auto hdr = getEventHeader_(new_buffer);
1182 443 : hdr->is_complete = false;
1183 443 : hdr->run_id = run_id_;
1184 443 : hdr->subrun_id = GetSubrunForSequenceID(seqID);
1185 443 : hdr->event_id = use_sequence_id_for_event_number_ ? static_cast<uint32_t>(seqID) : static_cast<uint32_t>(timestamp);
1186 443 : hdr->sequence_id = seqID;
1187 443 : hdr->timestamp = timestamp;
1188 443 : buffer_writes_pending_[new_buffer] = 0;
1189 443 : IncrementWritePos(new_buffer, sizeof(detail::RawEventHeader));
1190 443 : Globals::SetMFIteration("Sequence ID " + std::to_string(seqID));
1191 :
1192 886 : TLOG(TLVL_BUFFER) << "getBufferForSequenceID placing " << new_buffer << " to active.";
1193 443 : active_buffers_.insert(new_buffer);
1194 886 : TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
1195 0 : << size() << ","
1196 0 : << ReadReadyCount() << ","
1197 0 : << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
1198 0 : << WriteReadyCount(false) << ","
1199 0 : << pending_buffers_.size() << ","
1200 443 : << active_buffers_.size() << ")";
1201 :
1202 443 : if (requests_)
1203 : {
1204 443 : requests_->AddRequest(seqID, timestamp);
1205 : }
1206 886 : TLOG(TLVL_GETBUFFER) << "getBufferForSequenceID " << seqID << " returning newly initialized buffer " << new_buffer;
1207 443 : return new_buffer;
1208 4909 : }
1209 :
1210 0 : bool artdaq::SharedMemoryEventManager::hasFragments_(int buffer)
1211 : {
1212 0 : if (buffer == -1)
1213 : {
1214 0 : return true;
1215 : }
1216 0 : if (!CheckBuffer(buffer, BufferSemaphoreFlags::Writing))
1217 : {
1218 0 : return true;
1219 : }
1220 0 : ResetReadPos(buffer);
1221 0 : IncrementReadPos(buffer, sizeof(detail::RawEventHeader));
1222 0 : return MoreDataInBuffer(buffer);
1223 : }
1224 :
1225 2095 : void artdaq::SharedMemoryEventManager::complete_buffer_(int buffer)
1226 : {
1227 2095 : auto hdr = getEventHeader_(buffer);
1228 2095 : if (hdr != nullptr && hdr->is_complete)
1229 : {
1230 876 : TLOG(TLVL_COMPLETEBUFFER) << "complete_buffer_: This fragment completes event " << hdr->sequence_id << ".";
1231 :
1232 : {
1233 876 : TLOG(TLVL_BUFFER) << "complete_buffer_ moving " << buffer << " from active to pending.";
1234 :
1235 876 : TLOG(TLVL_BUFLCK) << "complete_buffer_: obtaining sequence_id_mutex lock for seqid=" << hdr->sequence_id;
1236 438 : std::unique_lock<std::mutex> lk(sequence_id_mutex_);
1237 876 : TLOG(TLVL_BUFLCK) << "complete_buffer_: obtained sequence_id_mutex lock for seqid=" << hdr->sequence_id;
1238 438 : active_buffers_.erase(buffer);
1239 438 : pending_buffers_.insert(buffer);
1240 438 : released_events_.insert(hdr->sequence_id);
1241 538 : while (released_events_.size() > max_event_list_length_)
1242 : {
1243 100 : released_events_.erase(released_events_.begin());
1244 : }
1245 :
1246 876 : TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
1247 0 : << size() << ","
1248 0 : << ReadReadyCount() << ","
1249 0 : << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
1250 0 : << WriteReadyCount(false) << ","
1251 0 : << pending_buffers_.size() << ","
1252 438 : << active_buffers_.size() << ")";
1253 438 : check_pending_buffers_(lk);
1254 438 : }
1255 438 : if (requests_)
1256 : {
1257 438 : requests_->RemoveRequest(hdr->sequence_id);
1258 : }
1259 : }
1260 2095 : check_pending_broadcasts_();
1261 2095 : }
1262 :
1263 0 : bool artdaq::SharedMemoryEventManager::bufferComparator(int bufA, int bufB)
1264 : {
1265 0 : return getEventHeader_(bufA) < getEventHeader_(bufB);
1266 : }
1267 :
1268 10 : void artdaq::SharedMemoryEventManager::CheckPendingBuffers()
1269 : {
1270 : {
1271 20 : TLOG(TLVL_BUFLCK) << "Obtaining sequence_id_mutex_";
1272 10 : std::unique_lock<std::mutex> lk(sequence_id_mutex_);
1273 20 : TLOG(TLVL_BUFLCK) << "Obtained sequence_id_mutex_";
1274 :
1275 10 : check_pending_buffers_(lk);
1276 10 : }
1277 10 : check_pending_broadcasts_();
1278 10 : }
1279 :
1280 1578 : void artdaq::SharedMemoryEventManager::check_pending_buffers_(std::unique_lock<std::mutex> const& lock)
1281 : {
1282 3156 : TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ BEGIN Locked=" << std::boolalpha << lock.owns_lock();
1283 :
1284 1578 : auto buffers = GetBuffersOwnedByManager();
1285 2031 : for (auto buf : buffers)
1286 : {
1287 453 : if (ResetBuffer(buf) && (pending_buffers_.count(buf) == 0u))
1288 : {
1289 2 : TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ Incomplete buffer detected, buf=" << buf << " active_bufers_.count(buf)=" << active_buffers_.count(buf) << " buffer_writes_pending_[buf]=" << buffer_writes_pending_[buf].load();
1290 1 : auto hdr = getEventHeader_(buf);
1291 2 : if ((active_buffers_.count(buf) != 0u) && buffer_writes_pending_[buf].load() == 0)
1292 : {
1293 1 : if (requests_)
1294 : {
1295 1 : requests_->RemoveRequest(hdr->sequence_id);
1296 : }
1297 2 : TLOG(TLVL_BUFFER) << "check_pending_buffers_ moving buffer " << buf << " from active to pending";
1298 1 : active_buffers_.erase(buf);
1299 1 : pending_buffers_.insert(buf);
1300 2 : TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
1301 0 : << size() << ","
1302 0 : << ReadReadyCount() << ","
1303 0 : << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
1304 0 : << WriteReadyCount(false) << ","
1305 0 : << pending_buffers_.size() << ","
1306 1 : << active_buffers_.size() << ")";
1307 :
1308 1 : run_incomplete_event_count_++;
1309 1 : if (metricMan)
1310 : {
1311 7 : metricMan->sendMetric("Incomplete Event Rate", 1, "events/s", 3, MetricMode::Rate);
1312 : }
1313 1 : if (released_incomplete_events_.count(hdr->sequence_id) == 0u)
1314 : {
1315 1 : released_incomplete_events_[hdr->sequence_id] = num_fragments_per_event_ - GetFragmentCountInBuffer(buf);
1316 : }
1317 : else
1318 : {
1319 0 : released_incomplete_events_[hdr->sequence_id] -= GetFragmentCountInBuffer(buf);
1320 : }
1321 :
1322 3 : TLOG(TLVL_WARNING) << "Event " << hdr->sequence_id
1323 2 : << " was opened " << TimeUtils::GetElapsedTime(event_timing_[buf]) << " s ago"
1324 1 : << " and has timed out (missing " << released_incomplete_events_[hdr->sequence_id] << " Fragments)."
1325 2 : << "Scheduling release to art.";
1326 : }
1327 : }
1328 : }
1329 :
1330 1578 : std::list<int> sorted_buffers(pending_buffers_.begin(), pending_buffers_.end());
1331 1578 : sorted_buffers.sort([this](int a, int b) { return bufferComparator(a, b); });
1332 :
1333 1578 : auto available_buffers = WriteReadyCount(overwrite_mode_);
1334 1578 : auto counter = 0;
1335 1578 : double eventSize = 0;
1336 1578 : double eventTime = 0;
1337 2017 : for (auto buf : sorted_buffers)
1338 : {
1339 439 : auto hdr = getEventHeader_(buf);
1340 439 : auto thisEventSize = BufferDataSize(buf);
1341 :
1342 439 : if (update_run_ids_ && hdr->subrun_id < subrun_id_)
1343 : {
1344 0 : hdr->subrun_id = subrun_id_;
1345 : }
1346 439 : bool currentSubrun = hdr->subrun_id == subrun_id_;
1347 :
1348 439 : if (hdr->subrun_id > subrun_id_ && (available_buffers > 0 || TimeUtils::GetElapsedTime(last_event_time_) < subrun_transition_hold_time_s_))
1349 : {
1350 0 : TLOG(TLVL_CHECKPENDINGBUFFERS_4) << "Holding event " << std::to_string(hdr->sequence_id) << " (sr=" << hdr->subrun_id << ") in buffer " << buf << ", "
1351 0 : << "event_size=" << thisEventSize << ", buffer_size=" << BufferSize();
1352 0 : continue;
1353 0 : }
1354 :
1355 878 : TLOG(TLVL_CHECKPENDINGBUFFERS_4) << "Releasing event " << std::to_string(hdr->sequence_id) << " (sr=" << hdr->subrun_id << ") in buffer " << buf << " to art, "
1356 439 : << "event_size=" << thisEventSize << ", buffer_size=" << BufferSize();
1357 439 : statsHelper_.addSample(EVENTS_RELEASED_STAT_KEY, thisEventSize);
1358 :
1359 878 : TLOG(TLVL_BUFFER) << "check_pending_buffers_ removing buffer " << buf << " moving from pending to full";
1360 439 : MarkBufferFull(buf);
1361 439 : run_event_count_++;
1362 439 : counter++;
1363 439 : eventSize += thisEventSize;
1364 439 : eventTime += TimeUtils::GetElapsedTime(event_timing_[buf]);
1365 439 : pending_buffers_.erase(buf);
1366 439 : if (currentSubrun)
1367 : {
1368 439 : last_event_time_ = std::chrono::steady_clock::now();
1369 : }
1370 : }
1371 3156 : TLOG(TLVL_BUFFER) << "Buffer occupancy now (total,full,reading,empty,pending,active)=("
1372 0 : << size() << ","
1373 0 : << ReadReadyCount() << ","
1374 0 : << WriteReadyCount(true) - WriteReadyCount(false) - ReadReadyCount() << ","
1375 0 : << WriteReadyCount(false) << ","
1376 0 : << pending_buffers_.size() << ","
1377 1578 : << active_buffers_.size() << ")";
1378 :
1379 1578 : if (tokens_ && tokens_->RoutingTokenSendsEnabled())
1380 : {
1381 0 : TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "Sent tokens: " << tokens_->GetSentTokenCount() << ", Event count: " << run_event_count_;
1382 0 : auto outstanding_tokens = tokens_->GetSentTokenCount() - run_event_count_;
1383 :
1384 0 : TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "check_pending_buffers_: outstanding_tokens: " << outstanding_tokens << ", available_buffers: " << available_buffers
1385 0 : << ", tokens_to_send: " << available_buffers - outstanding_tokens;
1386 :
1387 0 : if (available_buffers > outstanding_tokens)
1388 : {
1389 0 : auto tokens_to_send = available_buffers - outstanding_tokens;
1390 :
1391 0 : while (tokens_to_send > 0)
1392 : {
1393 0 : TLOG(TLVL_CHECKPENDINGBUFFERS_3) << "check_pending_buffers_: Sending a Routing Token";
1394 0 : tokens_->SendRoutingToken(1, run_id_);
1395 0 : tokens_to_send--;
1396 : }
1397 : }
1398 : }
1399 :
1400 1578 : if (statsHelper_.readyToReport())
1401 : {
1402 0 : std::string statString = buildStatisticsString_();
1403 0 : TLOG(TLVL_INFO) << statString;
1404 0 : }
1405 :
1406 1578 : if (metricMan)
1407 : {
1408 3156 : TLOG(TLVL_CHECKPENDINGBUFFERS_2) << "check_pending_buffers_: Sending Metrics";
1409 9468 : metricMan->sendMetric("Event Rate", counter, "Events", 1, MetricMode::Rate);
1410 9468 : metricMan->sendMetric("Data Rate", eventSize, "Bytes", 1, MetricMode::Rate);
1411 1578 : if (counter > 0)
1412 : {
1413 2634 : metricMan->sendMetric("Average Event Size", eventSize / counter, "Bytes", 1, MetricMode::Average);
1414 3073 : metricMan->sendMetric("Average Event Building Time", eventTime / counter, "s", 1, MetricMode::Average);
1415 : }
1416 :
1417 9468 : metricMan->sendMetric("Events Released to art this run", run_event_count_, "Events", 1, MetricMode::LastPoint);
1418 9468 : metricMan->sendMetric("Incomplete Events Released to art this run", run_incomplete_event_count_, "Events", 1, MetricMode::LastPoint);
1419 1578 : if (tokens_ && tokens_->RoutingTokenSendsEnabled())
1420 : {
1421 0 : metricMan->sendMetric("Tokens sent", tokens_->GetSentTokenCount(), "Tokens", 2, MetricMode::LastPoint);
1422 : }
1423 :
1424 1578 : auto bufferReport = GetBufferReport();
1425 1578 : int full = 0, empty = 0, writing = 0, reading = 0;
1426 17236 : for (auto& buf : bufferReport)
1427 : {
1428 15658 : switch (buf.second)
1429 : {
1430 11853 : case BufferSemaphoreFlags::Full:
1431 11853 : full++;
1432 11853 : break;
1433 3068 : case BufferSemaphoreFlags::Empty:
1434 3068 : empty++;
1435 3068 : break;
1436 14 : case BufferSemaphoreFlags::Writing:
1437 14 : writing++;
1438 14 : break;
1439 723 : case BufferSemaphoreFlags::Reading:
1440 723 : reading++;
1441 723 : break;
1442 : }
1443 : }
1444 1578 : auto total = size();
1445 3156 : TLOG(TLVL_CHECKPENDINGBUFFERS_2) << "Buffer usage: full=" << full << ", empty=" << empty << ", writing=" << writing << ", reading=" << reading << ", total=" << total;
1446 :
1447 9468 : metricMan->sendMetric("Shared Memory Full Buffers", full, "buffers", 2, MetricMode::LastPoint);
1448 9468 : metricMan->sendMetric("Shared Memory Available Buffers", empty, "buffers", 2, MetricMode::LastPoint);
1449 9468 : metricMan->sendMetric("Shared Memory Pending Buffers", writing, "buffers", 2, MetricMode::LastPoint);
1450 9468 : metricMan->sendMetric("Shared Memory Reading Buffers", reading, "buffers", 2, MetricMode::LastPoint);
1451 1578 : if (total > 0)
1452 : {
1453 9468 : metricMan->sendMetric("Shared Memory Full %", full * 100 / static_cast<double>(total), "%", 2, MetricMode::LastPoint);
1454 11046 : metricMan->sendMetric("Shared Memory Available %", empty * 100 / static_cast<double>(total), "%", 2, MetricMode::LastPoint);
1455 : }
1456 1578 : }
1457 3156 : TLOG(TLVL_CHECKPENDINGBUFFERS) << "check_pending_buffers_ END";
1458 1578 : }
1459 :
1460 0 : void artdaq::SharedMemoryEventManager::BroadcastFragment(FragmentPtr& frag)
1461 : {
1462 : {
1463 0 : std::lock_guard<std::mutex> lk(broadcast_mutex_);
1464 :
1465 0 : bool entry_found = false;
1466 0 : for (auto& entry : pending_broadcasts_)
1467 : {
1468 0 : if (entry.type == frag->type() && entry.sequence_id == frag->sequenceID())
1469 : {
1470 0 : TLOG(TLVL_BROADCASTFRAGMENT) << "Received BroadcastFragment of type " << static_cast<int>(frag->type()) << ", seqID " << frag->sequenceID() << " matching current pending_broadcasts_ entry. frags=" << entry.fragments.size() + 1 << "/" << init_fragment_count_;
1471 0 : entry.fragments.push_back(std::move(frag));
1472 0 : entry_found = true;
1473 0 : break;
1474 : }
1475 : }
1476 0 : if (!entry_found)
1477 : {
1478 0 : TLOG(TLVL_BROADCASTFRAGMENT) << "Received BroadcastFragment of type " << static_cast<int>(frag->type()) << ", seqID " << frag->sequenceID() << ", creating new pending_broadcasts_ entry";
1479 0 : pending_broadcasts_.emplace_back();
1480 0 : pending_broadcasts_.back().deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(GetBufferTimeout());
1481 0 : pending_broadcasts_.back().type = frag->type();
1482 0 : pending_broadcasts_.back().sequence_id = frag->sequenceID();
1483 0 : pending_broadcasts_.back().subrun_id = GetSubrunForSequenceID(frag->sequenceID());
1484 0 : pending_broadcasts_.back().fragments.push_back(std::move(frag));
1485 : }
1486 0 : }
1487 0 : check_pending_broadcasts_();
1488 0 : }
1489 :
1490 2105 : void artdaq::SharedMemoryEventManager::check_pending_broadcasts_()
1491 : {
1492 2105 : std::lock_guard<std::mutex> lk(broadcast_mutex_);
1493 :
1494 2105 : auto entry = pending_broadcasts_.begin();
1495 2105 : auto now = std::chrono::steady_clock::now();
1496 2105 : while (entry != pending_broadcasts_.end())
1497 : {
1498 0 : if (running_)
1499 : {
1500 0 : if ((!init_frags_sent_ || entry->fragments.size() == 0 || (entry->fragments.size() < init_fragment_count_ && now < entry->deadline)))
1501 : {
1502 0 : entry++;
1503 0 : continue;
1504 : }
1505 0 : if (entry->fragments.front()->type() == Fragment::EndOfSubrunFragmentType || entry->fragments.front()->type() == artdaq::Fragment::SubrunDataFragmentType)
1506 : {
1507 0 : if (entry->subrun_id == subrun_id_ && TimeUtils::GetElapsedTime(last_event_time_) < subrun_transition_hold_time_s_)
1508 : {
1509 0 : TLOG(TLVL_CHECKPENDINGBROADCASTS) << "Holding entry size = " << entry->fragments.size() << " / " << init_fragment_count_ << ", lead SeqID = " << entry->fragments.front()->sequenceID() << ", subrun=" << entry->subrun_id << " because it is EndOfSubrun and hold time has not expired";
1510 0 : entry++;
1511 0 : continue;
1512 0 : }
1513 : }
1514 : }
1515 :
1516 0 : TLOG(TLVL_CHECKPENDINGBROADCASTS) << "Broadcasting entry init_frags_sent_=" << init_frags_sent_ << ", size=" << entry->fragments.size() << "/" << init_fragment_count_ << ", subrun=" << entry->subrun_id << ", lead SeqID=" << entry->fragments.front()->sequenceID() << " deadline delta=" << std::chrono::duration_cast<std::chrono::microseconds>(now - entry->deadline).count() << " us";
1517 0 : broadcastFragments_(entry->fragments);
1518 0 : entry = pending_broadcasts_.erase(entry);
1519 : }
1520 2105 : }
1521 :
1522 22 : bool artdaq::SharedMemoryEventManager::broadcastFragments_(FragmentPtrs& frags)
1523 : {
1524 22 : if (frags.empty())
1525 : {
1526 0 : TLOG(TLVL_ERROR) << "Requested broadcast but no Fragments given!";
1527 0 : return false;
1528 : }
1529 22 : if (!broadcasts_.IsValid())
1530 : {
1531 0 : TLOG(TLVL_ERROR) << "Broadcast attempted but broadcast shared memory is unavailable!";
1532 0 : return false;
1533 : }
1534 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "Broadcasting " << frags.size() << " Fragments with lead seqID=" << frags.front()->sequenceID()
1535 0 : << ", type " << detail::RawFragmentHeader::SystemTypeToString(frags.front()->type())
1536 22 : << ", size=" << frags.front()->sizeBytes() << "B.";
1537 22 : auto buffer = broadcasts_.GetBufferForWriting(false);
1538 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: after getting buffer 1st buffer=" << buffer;
1539 22 : auto start_time = std::chrono::steady_clock::now();
1540 22 : while (buffer == -1 && TimeUtils::GetElapsedTimeMilliseconds(start_time) < static_cast<size_t>(broadcast_timeout_ms_))
1541 : {
1542 0 : usleep(10000);
1543 0 : buffer = broadcasts_.GetBufferForWriting(true); // Go into overwrite mode
1544 : }
1545 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: after getting buffer w/timeout, buffer=" << buffer << ", elapsed time=" << TimeUtils::GetElapsedTime(start_time) << " s.";
1546 22 : if (buffer == -1)
1547 : {
1548 0 : TLOG(TLVL_ERROR) << "Broadcast of fragment type " << frags.front()->typeString() << " failed due to timeout waiting for buffer!";
1549 0 : return false;
1550 : }
1551 :
1552 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_: Filling in RawEventHeader";
1553 22 : auto hdr = reinterpret_cast<detail::RawEventHeader*>(broadcasts_.GetBufferStart(buffer)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
1554 22 : hdr->run_id = run_id_;
1555 22 : hdr->subrun_id = GetSubrunForSequenceID(frags.front()->sequenceID());
1556 22 : hdr->sequence_id = frags.front()->sequenceID();
1557 22 : hdr->is_complete = true;
1558 22 : broadcasts_.IncrementWritePos(buffer, sizeof(detail::RawEventHeader));
1559 :
1560 22 : if (frags.front()->type() == artdaq::Fragment::EndOfSubrunFragmentType || frags.front()->type() == artdaq::Fragment::SubrunDataFragmentType)
1561 : {
1562 0 : subrun_id_ = hdr->subrun_id + 1;
1563 : }
1564 :
1565 44 : for (auto& frag : frags)
1566 : {
1567 22 : if (frag->sequenceID() != hdr->sequence_id || frag->type() != frags.front()->type())
1568 : {
1569 0 : TLOG(TLVL_WARNING) << "Skipping fragment due to Type/SeqID mismatch! seqID=" << frag->sequenceID() << " (expected " << hdr->sequence_id << "), type=" << static_cast<int>(frag->type()) << " (" << static_cast<int>(frags.front()->type()) << ")";
1570 0 : continue;
1571 0 : }
1572 44 : TLOG(TLVL_BROADCASTFRAGMENTS_2) << "broadcastFragments_ before Fragment Write call seqID=" << frag->sequenceID() << ", fragID=" << frag->fragmentID() << ", type=" << static_cast<int>(frag->type());
1573 22 : broadcasts_.Write(buffer, frag->headerAddress(), frag->size() * sizeof(RawDataType));
1574 : }
1575 :
1576 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_ Marking buffer full";
1577 22 : broadcasts_.MarkBufferFull(buffer, -1);
1578 44 : TLOG(TLVL_BROADCASTFRAGMENTS) << "broadcastFragments_ Complete";
1579 22 : return true;
1580 : }
1581 :
1582 0 : std::vector<char*> artdaq::SharedMemoryEventManager::parse_art_command_line_(const std::shared_ptr<art_config_file>& config_file, size_t process_index)
1583 : {
1584 0 : auto offset_index = process_index + art_process_index_offset_;
1585 0 : TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: Parsing command line " << art_cmdline_ << ", config_file: " << config_file->getFileName() << ", index: " << process_index << " (w/offset: " << offset_index << ")";
1586 0 : std::string art_cmdline_tmp = art_cmdline_;
1587 0 : auto filenameit = art_cmdline_tmp.find("#CONFIG_FILE#");
1588 0 : if (filenameit != std::string::npos)
1589 : {
1590 0 : art_cmdline_tmp.replace(filenameit, 13, config_file->getFileName());
1591 : }
1592 0 : auto indexit = art_cmdline_tmp.find("#PROCESS_INDEX#");
1593 0 : if (indexit != std::string::npos)
1594 : {
1595 0 : art_cmdline_tmp.replace(indexit, 15, std::to_string(offset_index));
1596 : }
1597 0 : TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: After replacing index and config parameters, command line is " << art_cmdline_tmp;
1598 :
1599 0 : std::istringstream iss(art_cmdline_tmp);
1600 0 : auto tokens = std::vector<std::string>{std::istream_iterator<std::string>{iss}, std::istream_iterator<std::string>{}};
1601 0 : std::vector<char*> output;
1602 :
1603 0 : for (auto& token : tokens)
1604 : {
1605 0 : TLOG(TLVL_PARSEARTCOMMANDLINE) << "parse_art_command_line_: Adding cmdline token " << token << " to output list";
1606 0 : output.emplace_back(new char[token.length() + 1]);
1607 0 : memcpy(output.back(), token.c_str(), token.length());
1608 0 : output.back()[token.length()] = '\0'; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
1609 : }
1610 0 : output.emplace_back(nullptr);
1611 :
1612 0 : return output;
1613 0 : }
1614 :
1615 6 : void artdaq::SharedMemoryEventManager::send_init_frags_()
1616 : {
1617 6 : std::lock_guard<std::mutex> lk(init_fragments_mutex_);
1618 6 : if (init_fragment_map_size_() >= init_fragment_count_ && init_fragment_count_ > 0)
1619 : {
1620 0 : TLOG(TLVL_INFO) << "Broadcasting " << init_fragment_map_size_() << " Init Fragment(s) to all art subprocesses...";
1621 :
1622 0 : FragmentPtrs init_fragments;
1623 0 : for (auto& fragment_id_pair : init_fragment_map_)
1624 : {
1625 0 : for (auto& ts_pair : fragment_id_pair.second)
1626 : {
1627 0 : init_fragments.emplace_back(std::make_unique<Fragment>(*ts_pair.second));
1628 : }
1629 : }
1630 :
1631 0 : broadcastFragments_(init_fragments);
1632 0 : TLOG(TLVL_SENDINIT) << "Init Fragment sent";
1633 0 : init_frags_sent_ = true;
1634 0 : }
1635 6 : else if (init_fragment_count_ > 0 && init_fragment_map_size_() == 0)
1636 : {
1637 0 : TLOG(TLVL_WARNING) << "Cannot send Init Fragment(s) because I haven't yet received them! Set send_init_fragments to false or init_fragment_count to 0 if this process does not receive serialized art events to avoid potentially lengthy timeouts!";
1638 : }
1639 6 : else if (init_fragment_count_ > 0)
1640 : {
1641 0 : TLOG(TLVL_INFO) << "Cannot send Init Fragment(s) because I haven't yet received them (have " << init_fragment_map_size_() << " of " << init_fragment_count_ << ")!";
1642 : }
1643 : else
1644 : {
1645 : // Send an empty Init Fragment so that ArtdaqInput knows that this is a pure-Fragment input
1646 6 : artdaq::FragmentPtrs begin_run_fragments_;
1647 6 : begin_run_fragments_.emplace_back(new artdaq::Fragment());
1648 6 : begin_run_fragments_.back()->setSystemType(artdaq::Fragment::InitFragmentType);
1649 6 : broadcastFragments_(begin_run_fragments_);
1650 6 : init_frags_sent_ = true;
1651 6 : }
1652 6 : }
1653 :
1654 0 : void artdaq::SharedMemoryEventManager::AddInitFragment(FragmentPtr& frag)
1655 : {
1656 0 : std::unique_lock<std::mutex> lk(init_fragments_mutex_);
1657 :
1658 0 : auto fragId = frag->fragmentID();
1659 0 : auto ts = frag->timestamp();
1660 :
1661 0 : init_fragment_map_[fragId][ts] = std::move(frag);
1662 0 : TLOG(TLVL_ADDINITFRAGMENT) << "Received Init Fragment from rank " << fragId << ", art process id " << ts << ". Now have " << init_fragment_map_size_() << " of " << init_fragment_count_;
1663 :
1664 : // Don't send until all init fragments have been received
1665 0 : if (init_fragment_map_size_() >= init_fragment_count_)
1666 : {
1667 0 : lk.unlock();
1668 0 : send_init_frags_();
1669 : }
1670 0 : }
1671 :
1672 6 : size_t artdaq::SharedMemoryEventManager::init_fragment_map_size_() const
1673 : {
1674 6 : size_t size = 0;
1675 :
1676 6 : for (auto& frag_id_pair : init_fragment_map_)
1677 : {
1678 0 : size += frag_id_pair.second.size();
1679 : }
1680 :
1681 6 : return size;
1682 : }
1683 :
1684 0 : void artdaq::SharedMemoryEventManager::UpdateArtConfiguration(fhicl::ParameterSet art_pset)
1685 : {
1686 0 : TLOG(TLVL_UPDATEARTCONFIG) << "UpdateArtConfiguration BEGIN";
1687 0 : if (art_pset != current_art_pset_ || !current_art_config_file_)
1688 : {
1689 0 : current_art_pset_ = art_pset;
1690 0 : if (manual_art_)
1691 0 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset, GetKey(), GetBroadcastKey());
1692 : else
1693 0 : current_art_config_file_ = std::make_shared<art_config_file>(art_pset);
1694 : }
1695 0 : TLOG(TLVL_UPDATEARTCONFIG) << "UpdateArtConfiguration END";
1696 0 : }
1697 :
1698 0 : std::string artdaq::SharedMemoryEventManager::buildStatisticsString_() const
1699 : {
1700 0 : std::ostringstream oss;
1701 0 : oss << app_name << " statistics:" << std::endl;
1702 :
1703 : artdaq::MonitoredQuantityPtr mqPtr =
1704 0 : artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(EVENTS_RELEASED_STAT_KEY);
1705 0 : if (mqPtr.get() != nullptr)
1706 : {
1707 0 : artdaq::MonitoredQuantityStats stats;
1708 0 : mqPtr->getStats(stats);
1709 0 : oss << " Event statistics: " << stats.recentSampleCount << " events released at " << stats.recentSampleRate
1710 0 : << " events/sec, effective data rate = "
1711 0 : << (stats.recentValueRate / 1024.0 / 1024.0)
1712 0 : << " MB/sec, monitor window = " << stats.recentDuration
1713 0 : << " sec, min::max event size = " << (stats.recentValueMin / 1024.0 / 1024.0)
1714 0 : << "::" << (stats.recentValueMax / 1024.0 / 1024.0) << " MB" << std::endl;
1715 0 : if (stats.recentSampleRate > 0.0)
1716 : {
1717 0 : oss << " Average time per event: ";
1718 0 : oss << " elapsed time = " << (1.0 / stats.recentSampleRate) << " sec" << std::endl;
1719 : }
1720 0 : }
1721 :
1722 0 : mqPtr = artdaq::StatisticsCollection::getInstance().getMonitoredQuantity(FRAGMENTS_RECEIVED_STAT_KEY);
1723 0 : if (mqPtr.get() != nullptr)
1724 : {
1725 0 : artdaq::MonitoredQuantityStats stats;
1726 0 : mqPtr->getStats(stats);
1727 0 : oss << " Fragment statistics: " << stats.recentSampleCount << " fragments received at " << stats.recentSampleRate
1728 0 : << " fragments/sec, effective data rate = "
1729 0 : << (stats.recentValueRate / 1024.0 / 1024.0)
1730 0 : << " MB/sec, monitor window = " << stats.recentDuration
1731 0 : << " sec, min::max fragment size = " << (stats.recentValueMin / 1024.0 / 1024.0)
1732 0 : << "::" << (stats.recentValueMax / 1024.0 / 1024.0) << " MB" << std::endl;
1733 0 : }
1734 :
1735 0 : oss << " Event counts: Run -- " << run_event_count_ << " Total, " << run_incomplete_event_count_ << " Incomplete."
1736 0 : << " Subrun -- " << subrun_event_count_ << " Total, " << subrun_incomplete_event_count_ << " Incomplete. "
1737 0 : << std::endl;
1738 : //-----------------------------------------------------------------------------
1739 : // P.Murat: add statistics on the SHM buffers
1740 : // there are 4 different flags: 0:empty, 1:writing; 2:full 3:reading
1741 : // want statistics on all of them
1742 : //-----------------------------------------------------------------------------
1743 : // auto = std::vector<std::pair<int, artdaq::SharedMemoryManager::BufferSemaphoreFlags>>
1744 0 : artdaq::SharedMemoryEventManager* nc_this = (artdaq::SharedMemoryEventManager*)this;
1745 :
1746 0 : int bsize = nc_this->BufferSize();
1747 :
1748 0 : auto v = nc_this->GetBufferReport();
1749 :
1750 0 : int nbb[4] = {0, 0, 0, 0};
1751 :
1752 0 : int nbuff = v.size();
1753 0 : for (int i = 0; i < nbuff; i++)
1754 : {
1755 0 : auto x = v[i];
1756 0 : int flag = (int)x.second;
1757 0 : nbb[flag]++;
1758 : }
1759 :
1760 0 : oss << " Shared Memory: "
1761 0 : << nbuff << " buffers of " << bsize << " B, "
1762 0 : << nbb[0] << " Empty, " << nbb[1] << " Writing, " << nbb[2] << " Full, " << nbb[3] << " reading"
1763 0 : << std::endl;
1764 :
1765 0 : return oss.str();
1766 0 : }
|