Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
2 : #define ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq-core/Core/SharedMemoryManager.hh"
8 : #include "artdaq-core/Data/RawEvent.hh"
9 : #include "artdaq-core/Utilities/configureMessageFacility.hh"
10 : #include "artdaq/DAQrate/StatisticsHelper.hh"
11 : #include "artdaq/DAQrate/detail/RequestSender.hh"
12 : #include "artdaq/DAQrate/detail/TokenSender.hh"
13 :
14 : #include "fhiclcpp/types/Atom.h"
15 : #include "fhiclcpp/types/Comment.h"
16 : #include "fhiclcpp/types/ConfigurationTable.h"
17 : #include "fhiclcpp/types/OptionalTable.h"
18 : #include "fhiclcpp/types/TableFragment.h"
19 :
20 : #define ART_SUPPORTS_DUPLICATE_EVENTS 0
21 :
22 : #include <sys/stat.h>
23 : #include <deque>
24 : #include <fstream>
25 : #include <iomanip>
26 : #include <set>
27 :
28 : namespace artdaq {
29 :
30 : /**
31 : * \brief art_config_file wraps a temporary file used to configure art
32 : */
33 : class art_config_file
34 : {
35 : public:
36 : /**
37 : * \brief art_config_file Constructor
38 : * \param ps ParameterSet to write to temporary file
39 : * \param shm_key Shared Memory key to use (if 0, child program will use parent PID to generate)
40 : * \param broadcast_key Shared Memory key to use for broadcasts (if 0, child program will use parent PID to generate)
41 : */
42 16 : explicit art_config_file(fhicl::ParameterSet ps, uint32_t shm_key = 0, uint32_t broadcast_key = 0)
43 16 : : dir_name_("/tmp/partition_" + std::to_string(Globals::GetPartitionNumber()))
44 16 : , file_name_(dir_name_ + "/artConfig_" + std::to_string(my_rank) + "_" + std::to_string(artdaq::TimeUtils::gettimeofday_us()) + ".fcl")
45 : {
46 16 : mkdir(dir_name_.c_str(), 0777); // Allowed to fail if directory already exists
47 :
48 16 : std::ofstream of(file_name_, std::ofstream::trunc);
49 16 : if (of.fail())
50 : {
51 : // Probably a permissions error...
52 0 : dir_name_ = "/tmp/partition_" + std::to_string(Globals::GetPartitionNumber()) + "_" + std::to_string(getuid());
53 0 : mkdir(dir_name_.c_str(), 0777); // Allowed to fail if directory already exists
54 0 : file_name_ = dir_name_ + "/artConfig_" + std::to_string(my_rank) + "_" + std::to_string(artdaq::TimeUtils::gettimeofday_us()) + ".fcl";
55 :
56 0 : of.open(file_name_, std::ofstream::trunc);
57 0 : if (of.fail())
58 : {
59 0 : TLOG(TLVL_ERROR, "ArtConfigFile") << "Failed to open configuration file after two attemps! ABORTING!";
60 0 : exit(46);
61 : }
62 : }
63 16 : of << ps.to_string();
64 :
65 60 : if (ps.has_key("services") && ps.has_key("services.message"))
66 : {
67 0 : auto existing_message_config = ps.get<fhicl::ParameterSet>("services.message");
68 0 : auto existing_destinations = existing_message_config.get<fhicl::ParameterSet>("destinations");
69 0 : auto generated_message_config = generateMessageFacilityConfiguration(mf::GetApplicationName().c_str(), true, false, "-art");
70 0 : auto generated_message_pset = fhicl::ParameterSet::make(generated_message_config);
71 0 : auto generated_destinations = generated_message_pset.get<fhicl::ParameterSet>("destinations");
72 0 : for (auto& dest : generated_destinations.get_pset_names())
73 : {
74 0 : existing_destinations.put(dest, generated_destinations.get<fhicl::ParameterSet>(dest));
75 0 : }
76 0 : existing_message_config.put_or_replace("destinations", existing_destinations);
77 0 : of << " services.message: { " + existing_message_config.to_string() << "} ";
78 0 : }
79 : else
80 : {
81 16 : of << " services.message: { " << generateMessageFacilityConfiguration(mf::GetApplicationName().c_str(), true, false, "-art") << "} ";
82 : }
83 :
84 16 : if (shm_key > 0 || broadcast_key > 0) TLOG(TLVL_INFO, "ArtConfigFile") << "Inserting Shared memory keys (0x" << std::hex << shm_key << ", 0x" << std::hex << broadcast_key << ") into source config";
85 16 : if (shm_key > 0) of << " source.shared_memory_key: 0x" << std::hex << shm_key;
86 16 : if (broadcast_key > 0) of << " source.broadcast_shared_memory_key: 0x" << std::hex << broadcast_key;
87 :
88 16 : of.flush();
89 16 : of.close();
90 16 : }
91 16 : ~art_config_file()
92 : {
93 16 : remove(file_name_.c_str());
94 16 : rmdir(dir_name_.c_str()); // Will only delete directory if no config files are left over
95 16 : }
96 : /**
97 : * \brief Get the path of the temporary file
98 : * \return The path of the temporary file
99 : */
100 6 : std::string getFileName() const { return file_name_; }
101 :
102 : private:
103 : art_config_file(art_config_file const&) = delete;
104 : art_config_file(art_config_file&&) = delete;
105 : art_config_file& operator=(art_config_file const&) = delete;
106 : art_config_file& operator=(art_config_file&&) = delete;
107 :
108 : std::string dir_name_;
109 : std::string file_name_;
110 : };
111 :
112 : /**
113 : * \brief The SharedMemoryEventManager is a SharedMemoryManger which tracks events as they are built
114 : */
115 : class SharedMemoryEventManager : public SharedMemoryManager
116 : {
117 : public:
118 : static const std::string FRAGMENTS_RECEIVED_STAT_KEY; ///< Key for Fragments Received MonitoredQuantity
119 : static const std::string EVENTS_RELEASED_STAT_KEY; ///< Key for the Events Released MonitoredQuantity
120 :
121 : typedef RawEvent::run_id_t run_id_t; ///< Copy RawEvent::run_id_t into local scope
122 : typedef RawEvent::subrun_id_t subrun_id_t; ///< Copy RawEvent::subrun_id_t into local scope
123 : typedef Fragment::sequence_id_t sequence_id_t; ///< Copy Fragment::sequence_id_t into local scope
124 : typedef std::map<sequence_id_t, RawEvent_ptr> EventMap; ///< An EventMap is a map of RawEvent_ptr objects, keyed by sequence ID
125 :
126 : /// <summary>
127 : /// Configuration of the SharedMemoryEventManager. May be used for parameter validation
128 : /// </summary>
129 : struct Config
130 : {
131 : /// "max_event_size_bytes" REQUIRED: Maximum event size(all Fragments), in bytes
132 : /// Either max_fragment_size_bytes or max_event_size_bytes must be specified
133 : fhicl::Atom<size_t> max_event_size_bytes{fhicl::Name{"max_event_size_bytes"}, fhicl::Comment{"Maximum event size (all Fragments), in bytes"}};
134 : /// "stale_buffer_timeout_usec" (Default: event_queue_wait_time * 1, 000, 000) : Maximum amount of time elapsed before a buffer is marked as abandoned.Time is reset each time an operation is performed on the buffer.
135 : fhicl::Atom<size_t> stale_buffer_timeout_usec{fhicl::Name{"stale_buffer_timeout_usec"}, fhicl::Comment{"Maximum amount of time elapsed before a buffer is marked as abandoned. Time is reset each time an operation is performed on the buffer."}, 5000000};
136 : /// "overwite_mode" (Default: false): Whether new data is allowed to overwrite buffers in the "Full" state
137 : fhicl::Atom<bool> overwrite_mode{fhicl::Name{"overwrite_mode"}, fhicl::Comment{"Whether buffers are allowed to be overwritten when safe (state == Full or Reading)"}, false};
138 : /// "restart_crashed_art_processes" (Default: true) : Whether to automatically restart art processes that fail for any reason
139 : fhicl::Atom<bool> restart_crashed_art_processes{fhicl::Name{"restart_crashed_art_processes"}, fhicl::Comment{"Whether to automatically restart art processes that fail for any reason"}, true};
140 : /// "shared_memory_key" (Default 0xBEE70000 + PID) : Key to use for shared memory access
141 : fhicl::Atom<uint32_t> shared_memory_key{fhicl::Name{"shared_memory_key"}, fhicl::Comment{"Key to use for shared memory access"}, 0xBEE70000 + getpid()};
142 : /// "buffer_count" REQUIRED: Number of events in the Shared Memory(incomplete + pending art)
143 : fhicl::Atom<size_t> buffer_count{fhicl::Name{"buffer_count"}, fhicl::Comment{"Number of events in the Shared Memory (incomplete + pending art)"}};
144 : /// "max_fragment_size_bytes" REQURIED: Maximum Fragment size, in bytes
145 : /// Either max_fragment_size_bytes or max_event_size_bytes must be specified
146 : fhicl::Atom<size_t> max_fragment_size_bytes{fhicl::Name{"max_fragment_size_bytes"}, fhicl::Comment{" Maximum Fragment size, in bytes"}};
147 : /// "event_queue_wait_time" (Default: 5) : Amount of time(in seconds) an event can exist in shared memory before being released to art.Used as input to default parameter of "stale_buffer_timeout_usec".
148 : fhicl::Atom<size_t> event_queue_wait_time{fhicl::Name{"event_queue_wait_time"}, fhicl::Comment{"Amount of time (in seconds) an event can exist in shared memory before being released to art. Used as input to default parameter of \"stale_buffer_timeout_usec\"."}, 5};
149 : /// "broadcast_mode" (Default: false) : When true, buffers are not marked Empty when read, but return to Full state.Buffers are overwritten in order received.
150 : fhicl::Atom<bool> broadcast_mode{fhicl::Name{"broadcast_mode"}, fhicl::Comment{"When true, buffers are not marked Empty when read, but return to Full state. Buffers are overwritten in order received."}, false};
151 : /// "art_analyzer_count" (Default: 1) : Number of art procceses to start
152 : fhicl::Atom<size_t> art_analyzer_count{fhicl::Name{"art_analyzer_count"}, fhicl::Comment{"Number of art procceses to start"}, 1};
153 : /// "expected_fragments_per_event" (REQUIRED) : Number of Fragments to expect per event
154 : fhicl::Atom<size_t> expected_fragments_per_event{fhicl::Name{"expected_fragments_per_event"}, fhicl::Comment{"Number of Fragments to expect per event"}};
155 : /// "maximum_oversize_fragment_count" (Default: 1): Maximum number of over-size Fragments to drop before throwing an exception. Default is 1, which means to throw an exception if any over-size Fragments are dropped. Set to 0 to disable.
156 : fhicl::Atom<int> maximum_oversize_fragment_count{fhicl::Name{"maximum_oversize_fragment_count"}, fhicl::Comment{"Maximum number of over-size Fragments to drop before throwing an exception. Default is 1, which means to throw an exception if any over-size Fragments are dropped. Set to 0 to disable."}, 1};
157 : /// "update_run_ids_on_new_fragment" (Default: true) : Whether the run and subrun ID of an event should be updated whenever a Fragment is added.
158 : fhicl::Atom<bool> update_run_ids_on_new_fragment{fhicl::Name{"update_run_ids_on_new_fragment"}, fhicl::Comment{"Whether the run and subrun ID of an event should be updated whenever a Fragment is added."}, true};
159 : /// "use_sequence_id_for_event_number" (Default: true): Whether to use the artdaq Sequence ID (true) or the Timestamp (false) for art Event numbers
160 : fhicl::Atom<bool> use_sequence_id_for_event_number{fhicl::Name{"use_sequence_id_for_event_number"}, fhicl::Comment{"Whether to use the artdaq Sequence ID (true) or the Timestamp (false) for art Event numbers"}, true};
161 : /// "max_subrun_lookup_table_size" (Default: 100): The maximum number of entries to store in the sequence ID-SubRun ID lookup table
162 : fhicl::Atom<size_t> max_subrun_lookup_table_size{fhicl::Name{"max_subrun_lookup_table_size"}, fhicl::Comment{"The maximum number of entries to store in the sequence ID-SubRun ID lookup table"}, 100};
163 : /// "max_event_list_length" (Default: 100): The maximum number of entries to store in the released events list
164 : fhicl::Atom<size_t> max_event_list_length{fhicl::Name{"max_event_list_length"}, fhicl::Comment{" The maximum number of entries to store in the released events list"}, 100};
165 : /// "send_init_fragments" (Default: true): Whether Init Fragments are expected to be sent to art. If true, a Warning message is printed when an Init Fragment is requested but none are available.
166 : fhicl::Atom<bool> send_init_fragments{fhicl::Name{"send_init_fragments"}, fhicl::Comment{"Whether Init Fragments are expected to be sent to art. If true, a Warning message is printed when an Init Fragment is requested but none are available."}, true};
167 : /// "open_event_report_interval_ms" (Default: -1): Interval at which an open event report should be written
168 : fhicl::Atom<int> open_event_report_interval_ms{fhicl::Name{"open_event_report_interval_ms"}, fhicl::Comment{"Interval at which an open event report should be written"}, -1};
169 : /// "fragment_broadcast_timeout_ms" (Default: 3000): Amount of time broadcast fragments should live in the broadcast shared memory segment
170 : /// A "Broadcast shared memory segment" is used for all system-level fragments, such as Init, Start/End Run, Start/End Subrun and EndOfData
171 : fhicl::Atom<int> fragment_broadcast_timeout_ms{fhicl::Name{"fragment_broadcast_timeout_ms"}, fhicl::Comment{"Amount of time broadcast fragments should live in the broadcast shared memory segment"}, 3000};
172 : /// "art_command_line" (Default: "art -c \#CONFIG_FILE\#"): Command line used to start analysis processes. Supports two special sequences: \#CONFIG_FILE\# will be replaced with the fhicl config file. \#PROCESS_INDEX\# will be replaced by the index of the art process.
173 : fhicl::Atom<std::string> art_command_line{fhicl::Name{"art_command_line"}, fhicl::Comment{"Command line used to start analysis processes. Supports two special sequences: #CONFIG_FILE# will be replaced with the fhicl config file. #PROCESS_INDEX# will be replaced by the index of the art process."}, "art -c #CONFIG_FILE#"};
174 : /// "art_index_offset" (Default: 0): Offset to add to art process index when replacing \#PROCESS_INDEX\#
175 : fhicl::Atom<size_t> art_index_offset{fhicl::Name{"art_index_offset"}, fhicl::Comment{"Offset to add to art process index when replacing #PROCESS_INDEX#"}, 0};
176 : /// "minimum_art_lifetime_s" (Default: 2 seconds): Amount of time that an art process should run to not be considered "DOA"
177 : fhicl::Atom<double> minimum_art_lifetime_s{fhicl::Name{"minimum_art_lifetime_s"}, fhicl::Comment{"Amount of time that an art process should run to not be considered \"DOA\""}, 2.0};
178 : /// "expected_art_event_processing_time_us" (Default: 100000 us): During shutdown, SMEM will wait for this amount of time while it is checking that the art threads are done reading buffers.
179 : /// (TUNING: Should be slightly longer than the mean art processing time, but not so long that the Stop transition times out)
180 : fhicl::Atom<size_t> expected_art_event_processing_time_us{fhicl::Name{"expected_art_event_processing_time_us"}, fhicl::Comment{"During shutdown, SMEM will wait for this amount of time while it is checking that the art threads are done reading buffers."}, 100000};
181 : /// "broadcast_shared_memory_key" (Default: 0xCEE7000 + PID): Key to use for broadcast shared memory access
182 : fhicl::Atom<uint32_t> broadcast_shared_memory_key{fhicl::Name{"broadcast_shared_memory_key"}, fhicl::Comment{""}, 0xCEE70000 + getpid()};
183 : /// "broadcast_buffer_count" (Default: 10): Buffers in the broadcast shared memory segment
184 : fhicl::Atom<size_t> broadcast_buffer_count{fhicl::Name{"broadcast_buffer_count"}, fhicl::Comment{"Buffers in the broadcast shared memory segment"}, 10};
185 : /// "broadcast_buffer_size" (Default: 0x100000): Size of the buffers in the broadcast shared memory segment
186 : fhicl::Atom<size_t> broadcast_buffer_size{fhicl::Name{"broadcast_buffer_size"}, fhicl::Comment{"Size of the buffers in the broadcast shared memory segment"}, 0x100000};
187 : /// "use_art" (Default: true): Whether to start and manage art threads (Sets art_analyzer count to 0 and overwrite_mode to true when false)
188 : fhicl::Atom<bool> use_art{fhicl::Name{"use_art"}, fhicl::Comment{"Whether to start and manage art threads (Sets art_analyzer count to 0 and overwrite_mode to true when false)"}, true};
189 : /// "manual_art" (Default: false): Prints the startup command line for the art process so that the user may (for example) run it in GDB or valgrind
190 : fhicl::Atom<bool> manual_art{fhicl::Name{"manual_art"}, fhicl::Comment{"Prints the startup command line for the art process so that the user may (for example) run it in GDB or valgrind"}, false};
191 : /// Configuration of the RequestSender. See artdaq::RequestSender::Config
192 : fhicl::TableFragment<artdaq::RequestSender::Config> requestSenderConfig;
193 : /// Configuration of the TokenSender. See artdaq::TokenSender::Config
194 : fhicl::OptionalTable<artdaq::TokenSender::Config> tokenSenderConfig{fhicl::Name{"routing_token_config"}, fhicl::Comment{"Configuration for the Routing TokenSender"}};
195 : };
196 : /// Used for ParameterSet validation (if desired)
197 : using Parameters = fhicl::WrappedTable<Config>;
198 :
199 : /**
200 : * \brief SharedMemoryEventManager Constructor
201 : * \param pset ParameterSet used to configure SharedMemoryEventManager. See artdaq::SharedMemoryEventManager::Config for description of parameters
202 : * \param art_pset ParameterSet used to configure art. See art::Config for description of expected document format
203 : */
204 : SharedMemoryEventManager(const fhicl::ParameterSet& pset, fhicl::ParameterSet art_pset);
205 : /**
206 : * \brief SharedMemoryEventManager Destructor
207 : */
208 : virtual ~SharedMemoryEventManager() noexcept;
209 :
210 : private:
211 : /**
212 : * \brief Add a Fragment to the SharedMemoryEventManager
213 : * \param frag Header of the Fragment (seq ID and size info)
214 : * \param dataPtr Pointer to the fragment's data (i.e. Fragment::headerAddress())
215 : * \return Whether the Fragment was successfully added
216 : */
217 : bool AddFragment(detail::RawFragmentHeader frag, void* dataPtr);
218 :
219 : public:
220 : /**
221 : * \brief Copy a Fragment into the SharedMemoryEventManager
222 : * \param frag FragmentPtr object
223 : * \param timeout_usec Timeout for adding Fragment to the Shared Memory
224 : * \param [out] outfrag Rejected Fragment if timeout occurs
225 : * \return Whether the Fragment was successfully added
226 : */
227 : bool AddFragment(FragmentPtr frag, size_t timeout_usec, FragmentPtr& outfrag);
228 :
229 : /**
230 : * \brief Get a pointer to a reserved memory area for the given Fragment header
231 : * \param frag Fragment header (contains sequence ID and size information)
232 : * \param dropIfNoBuffersAvailable Whether to drop the fragment (instead of returning nullptr) when no buffers are available (Default: false)
233 : * \return Pointer to memory location for Fragment body (Header is copied into buffer here)
234 : */
235 : RawDataType* WriteFragmentHeader(detail::RawFragmentHeader frag, bool dropIfNoBuffersAvailable = false);
236 :
237 : /**
238 : * \brief Used to indicate that the given Fragment is now completely in the buffer. Will check for buffer completeness, and unset the pending flag.
239 : * \param frag Fragment that is now completely in the buffer.
240 : */
241 : void DoneWritingFragment(detail::RawFragmentHeader frag);
242 :
243 : /**
244 : * \brief Returns the number of buffers which contain data but are not yet complete
245 : * \return The number of buffers which contain data but are not yet complete
246 : */
247 41 : size_t GetOpenEventCount() { return active_buffers_.size(); }
248 :
249 : /**
250 : * \brief Returns the number of events which are complete but waiting on lower sequenced events to finish
251 : * \return The number of events which are complete but waiting on lower sequenced events to finish
252 : */
253 6 : size_t GetPendingEventCount() { return pending_buffers_.size(); }
254 :
255 : /**
256 : * \brief Returns the number of buffers currently owned by this manager
257 : * \return The number of buffers currently owned by this manager
258 : */
259 1 : size_t GetLockedBufferCount() { return GetBuffersOwnedByManager().size(); }
260 :
261 : /**
262 : * \brief Returns the number of events sent to art this run
263 : * \return The number of events sent to art this run
264 : */
265 19 : size_t GetArtEventCount() { return run_event_count_; }
266 :
267 : /**
268 : * \brief Get the count of Fragments of a given type in an event
269 : * \param seqID Sequence ID of Fragments
270 : * \param type Type of fragments to count. Use InvalidFragmentType to count all fragments (default)
271 : * \return Number of Fragments in event of given type
272 : */
273 : size_t GetFragmentCount(Fragment::sequence_id_t seqID, Fragment::type_t type = Fragment::InvalidFragmentType);
274 :
275 : /**
276 : * \brief Get the count of Fragments of a given type in a buffer
277 : * \param buffer Buffer to count
278 : * \param type Type of fragments to count. Use InvalidFragmentType to count all fragments (default)
279 : * \return Number of Fragments in buffer of given type
280 : */
281 : size_t GetFragmentCountInBuffer(int buffer, Fragment::type_t type = Fragment::InvalidFragmentType);
282 :
283 : void UpdateFragmentHeader(int buffer, detail::RawFragmentHeader hdr);
284 :
285 : /**
286 : * \brief Run an art instance, recording the return codes and restarting it until the end flag is raised
287 : */
288 : void RunArt(size_t process_index, const std::shared_ptr<std::atomic<pid_t>>& pid_out);
289 : /**
290 : * \brief Start all the art processes
291 : */
292 : void StartArt();
293 :
294 : /**
295 : * \brief Start one art process
296 : * \param pset ParameterSet to send to this art process
297 : * \param process_index Index of this art process (when starting multiple)
298 : * \return pid_t of the started process
299 : */
300 : pid_t StartArtProcess(fhicl::ParameterSet pset, size_t process_index);
301 :
302 : /**
303 : * \brief Shutdown a set of art processes
304 : * \param pids PIDs of the art processes
305 : */
306 : void ShutdownArtProcesses(std::set<pid_t>& pids);
307 :
308 : /**
309 : * \brief Restart all art processes, using the given fhicl code to configure the new art processes
310 : * \param art_pset ParameterSet used to configure art
311 : * \param newRun New Run number for reconfigured art
312 : * \param n_art_processes Number of art processes to start, -1 (default) leaves the number unchanged
313 : */
314 : void ReconfigureArt(fhicl::ParameterSet art_pset, run_id_t newRun = 0, int n_art_processes = -1);
315 :
316 : /**
317 : * \brief Indicate that the end of input has been reached to the art processes.
318 : * \return True if the end proceeded correctly
319 : *
320 : * Put the end-of-data marker onto the RawEvent queue (if possible),
321 : * wait for the reader function to exit, and fill in the reader return
322 : * value. This scenario returns true. If the end-of-data marker
323 : * can not be pushed onto the RawEvent queue, false is returned.
324 : */
325 : bool endOfData();
326 :
327 : /**
328 : * \brief Start a Run
329 : * \param runID Run number of the new run
330 : */
331 : void startRun(run_id_t runID);
332 :
333 : /**
334 : * \brief Get the current Run number
335 : * \return The current Run number
336 : */
337 6 : run_id_t runID() const { return run_id_; }
338 :
339 : /**
340 : * \brief Send an EndOfRunFragment to the art thread
341 : * \return True if enqueue successful
342 : */
343 : bool endRun();
344 :
345 : /**
346 : * \brief Rollover the subrun after the specified event
347 : * \param boundary sequence ID of the boundary (Event with seqID == boundary will be in new subrun)
348 : * \param subrun Subrun number of subrun after boundary
349 : * \param sendFragment Create and send an EndOfSubrun Fragment for this transition
350 : */
351 : void rolloverSubrun(sequence_id_t boundary, subrun_id_t subrun, bool sendFragment);
352 :
353 : /**
354 : * \brief Add a subrun transition immediately after the highest currently define sequence ID
355 : */
356 : void rolloverSubrun(bool sendFragment);
357 :
358 : /**
359 : * \brief Send metrics to the MetricManager, if one has been instantiated in the application
360 : */
361 : void sendMetrics();
362 :
363 : /**
364 : * \brief Set the RequestMessageMode for all outgoing data requests
365 : * \param mode Mode to set
366 : */
367 2 : void setRequestMode(detail::RequestMessageMode mode)
368 : {
369 2 : if (requests_) requests_->SetRequestMode(mode);
370 2 : }
371 :
372 : /**
373 : * \brief Set the overwrite flag (non-reliable data transfer) for the Shared Memory
374 : * \param overwrite Whether to allow the writer to overwrite data that has not yet been read
375 : */
376 : void setOverwrite(bool overwrite) { overwrite_mode_ = overwrite; }
377 :
378 : /**
379 : * \brief Set the stored Init fragment, if one has not yet been set already.
380 : */
381 : void AddInitFragment(FragmentPtr& frag);
382 :
383 : /**
384 : * @brief Add a Fragment for broadcast. May be collected with other Fragments before sending
385 : * @param frag Fragment to broadcast
386 : */
387 : void BroadcastFragment(FragmentPtr& frag);
388 :
389 : /**
390 : * \brief Gets the shared memory key of the broadcast SharedMemoryManager
391 : * \return The shared memory key of the broadcast SharedMemoryManager
392 : */
393 1 : uint32_t GetBroadcastKey() { return broadcasts_.GetKey(); }
394 :
395 : /**
396 : * \brief Gets the address of the "dropped data" fragment. Used for testing.
397 : * \param frag Fragment ID to get "dropped data" for
398 : * \return Pointer to the data payload of the "dropped data" fragment
399 : */
400 2 : RawDataType* GetDroppedDataAddress(detail::RawFragmentHeader frag)
401 : {
402 2 : for (auto it = dropped_data_.begin(); it != dropped_data_.end(); ++it)
403 : {
404 2 : if (frag.operator==(it->first)) // TODO, ELF 5/26/2023: Workaround until artdaq_core can be fixed for C++20
405 : {
406 2 : return it->second->dataBegin();
407 : }
408 : }
409 0 : return nullptr;
410 : }
411 :
412 : /**
413 : * \brief Updates the internally-stored copy of the art configuration.
414 : * \param art_pset ParameterSet used to configure art
415 : *
416 : * This method updates the internally-stored copy of the art configuration, but it does not
417 : * restart art processes. So, if this method is called while art processes are running, it will
418 : * have no effect until the next restart, such as the next Start of run. Typically, this
419 : * method is intended to be called between runs, when no art processes are running.
420 : */
421 : void UpdateArtConfiguration(fhicl::ParameterSet art_pset);
422 :
423 : /**
424 : * \brief Check for buffers which are ready to be marked incomplete and released to art and issue tokens for any buffers which are avaialble
425 : */
426 : void CheckPendingBuffers();
427 :
428 : /**
429 : * \brief Get the subrun number that the given Sequence ID would be assigned to
430 : * \param seqID Sequence ID to check
431 : * \return Subrun number that the given sequence ID will be associated with
432 : */
433 : subrun_id_t GetSubrunForSequenceID(Fragment::sequence_id_t seqID);
434 :
435 : /**
436 : * \brief Get the current subrun number (Gets the last defined subrun)
437 : * \return Number of the subrun that corresponds to events with the maximum possible sequence ID.
438 : */
439 4 : subrun_id_t GetCurrentSubrun() { return GetSubrunForSequenceID(Fragment::InvalidSequenceID); }
440 :
441 0 : std::string BuildStatisticsString() const { return buildStatisticsString_(); };
442 :
443 : private:
444 : SharedMemoryEventManager(SharedMemoryEventManager const&) = delete;
445 : SharedMemoryEventManager(SharedMemoryEventManager&&) = delete;
446 : SharedMemoryEventManager& operator=(SharedMemoryEventManager const&) = delete;
447 : SharedMemoryEventManager& operator=(SharedMemoryEventManager&&) = delete;
448 :
449 44 : size_t get_art_process_count_()
450 : {
451 44 : std::unique_lock<std::mutex> lk(art_process_mutex_);
452 88 : return art_processes_.size();
453 44 : }
454 :
455 : std::string buildStatisticsString_() const;
456 :
457 : private:
458 : size_t num_art_processes_;
459 : size_t const num_fragments_per_event_;
460 : size_t const queue_size_;
461 : run_id_t run_id_;
462 :
463 : std::map<sequence_id_t, subrun_id_t> subrun_event_map_;
464 : subrun_id_t subrun_id_;
465 : size_t max_subrun_event_map_length_;
466 : static std::mutex subrun_event_map_mutex_;
467 : double subrun_transition_hold_time_s_;
468 : std::chrono::steady_clock::time_point last_event_time_;
469 :
470 : std::set<int> active_buffers_;
471 : std::set<int> pending_buffers_;
472 : std::unordered_map<Fragment::sequence_id_t, size_t> released_incomplete_events_;
473 : std::set<Fragment::sequence_id_t> released_events_;
474 : size_t max_event_list_length_;
475 :
476 : bool update_run_ids_;
477 : bool use_sequence_id_for_event_number_;
478 : bool overwrite_mode_;
479 : size_t init_fragment_count_;
480 : std::atomic<bool> running_;
481 :
482 : std::unordered_map<int, std::atomic<int>> buffer_writes_pending_;
483 : std::unordered_map<int, std::mutex> buffer_mutexes_;
484 : static std::mutex sequence_id_mutex_;
485 :
486 : int open_event_report_interval_ms_;
487 : std::chrono::steady_clock::time_point last_open_event_report_time_;
488 : std::chrono::steady_clock::time_point last_backpressure_report_time_;
489 : std::chrono::steady_clock::time_point last_fragment_header_write_time_;
490 : std::vector<std::chrono::steady_clock::time_point> event_timing_;
491 :
492 : StatisticsHelper statsHelper_;
493 :
494 : int broadcast_timeout_ms_;
495 :
496 : std::atomic<int> run_event_count_;
497 : std::atomic<int> run_incomplete_event_count_;
498 : std::atomic<int> subrun_event_count_;
499 : std::atomic<int> subrun_incomplete_event_count_;
500 : std::atomic<int> oversize_fragment_count_;
501 : int maximum_oversize_fragment_count_;
502 :
503 : mutable std::mutex art_process_mutex_;
504 : std::set<pid_t> art_processes_;
505 : std::atomic<bool> restart_art_;
506 : bool always_restart_art_;
507 : std::atomic<bool> manual_art_;
508 : fhicl::ParameterSet current_art_pset_;
509 : std::shared_ptr<art_config_file> current_art_config_file_;
510 : std::string art_cmdline_;
511 : size_t art_process_index_offset_;
512 : double minimum_art_lifetime_s_;
513 : size_t art_event_processing_time_us_;
514 :
515 : std::unique_ptr<RequestSender> requests_;
516 : std::unique_ptr<TokenSender> tokens_;
517 : fhicl::ParameterSet data_pset_;
518 :
519 : std::mutex init_fragments_mutex_;
520 : std::unordered_map<Fragment::fragment_id_t, std::unordered_map<int, FragmentPtr>> init_fragment_map_;
521 : bool init_frags_sent_{false};
522 : std::list<std::pair<detail::RawFragmentHeader, FragmentPtr>> dropped_data_;
523 :
524 : mutable std::mutex broadcast_mutex_;
525 : struct BroadcastEntry
526 : {
527 : Fragment::type_t type;
528 : Fragment::sequence_id_t sequence_id;
529 : subrun_id_t subrun_id;
530 : FragmentPtrs fragments;
531 : std::chrono::steady_clock::time_point deadline;
532 : };
533 : std::vector<BroadcastEntry> pending_broadcasts_;
534 : void check_pending_broadcasts_();
535 :
536 : bool broadcastFragments_(FragmentPtrs& frags);
537 :
538 : detail::RawEventHeader* getEventHeader_(int buffer);
539 :
540 : int getBufferForSequenceID_(Fragment::sequence_id_t seqID, bool create_new, Fragment::timestamp_t timestamp = Fragment::InvalidTimestamp);
541 : bool hasFragments_(int buffer);
542 : void complete_buffer_(int buffer);
543 : bool bufferComparator(int bufA, int bufB);
544 : void check_pending_buffers_(std::unique_lock<std::mutex> const& lock);
545 : std::vector<char*> parse_art_command_line_(const std::shared_ptr<art_config_file>& config_file, size_t process_index);
546 :
547 : void send_init_frags_();
548 : size_t init_fragment_map_size_() const;
549 : SharedMemoryManager broadcasts_;
550 : };
551 : } // namespace artdaq
552 :
553 : #endif // ARTDAQ_DAQRATE_SHAREDMEMORYEVENTMANAGER_HH
|