Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
2 : #define ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq/DAQrate/detail/FragCounter.hh"
8 : #include "artdaq/TransferPlugins/TransferInterface.hh"
9 :
10 : namespace fhicl {
11 : class ParameterSet;
12 : }
13 :
14 : #include <condition_variable>
15 : #include <map>
16 : #include <memory>
17 : #include <set>
18 :
19 : namespace artdaq {
20 : class FragmentReceiverManager;
21 : class FragmentStoreElement;
22 : } // namespace artdaq
23 :
24 : /**
25 : * \brief Receives Fragment objects from one or more DataSenderManager instances using TransferInterface plugins
26 : * DataReceiverMaanger runs a reception thread for each source, and can automatically suppress reception from
27 : * sources which are going faster than the others.
28 : */
29 : class artdaq::FragmentReceiverManager
30 : {
31 : public:
32 : /**
33 : * \brief FragmentReceiverManager Constructor
34 : * \param ps ParameterSet used to configure the FragmentReceiverManager
35 : *
36 : * \verbatim
37 : * FragmentReceiverManager accepts the following Parameters:
38 : * "auto_suppression_enabled" (Default: true): Whether to suppress a source that gets too far ahead
39 : * "max_receive_difference" (Default: 50): Threshold (in sequence ID) for suppressing a source
40 : * "receive_timeout_usec" (Default: 100000): The timeout for receive operations
41 : * "enabled_sources" (OPTIONAL): List of sources which are enabled. If not specified, all sources are assumed enabled
42 : * "sources" (Default: blank table): FHiCL table containing TransferInterface configurations for each source.
43 : * NOTE: "source_rank" MUST be specified (and unique) for each source!
44 : * \endverbatim
45 : */
46 : explicit FragmentReceiverManager(const fhicl::ParameterSet& ps);
47 :
48 : /**
49 : * \brief FragmentReceiverManager Destructor
50 : */
51 : virtual ~FragmentReceiverManager();
52 :
53 : /**
54 : * \brief Receive a Fragment
55 : * \param[out] rank Rank of sender that sent the Fragment, or RECV_TIMEOUT
56 : * \param timeout_usec Timeout to wait for a Fragment to become ready
57 : * \return Pointer to received Fragment. May be nullptr if no Fragments are ready
58 : */
59 : FragmentPtr recvFragment(int& rank, size_t timeout_usec = 0);
60 :
61 : /**
62 : * \brief Return the count of Fragment objects received by this FragmentReceiverManager
63 : * \return The count of Fragment objects received by this FragmentReceiverManager
64 : */
65 : size_t count() const;
66 :
67 : /**
68 : * \brief Get the count of Fragment objects received by this FragmentReceiverManager from a given source
69 : * \param rank Source rank to get count for
70 : * \return The count of Fragment objects received by this FragmentReceiverManager from the source
71 : */
72 : size_t slotCount(size_t rank) const;
73 :
74 : /**
75 : * \brief Get the total size of all data recieved by this FragmentReceiverManager
76 : * \return The total size of all data received by this FragmentReceiverManager
77 : */
78 : size_t byteCount() const;
79 :
80 : /**
81 : * \brief Start receiver threads for all enabled sources
82 : */
83 : void start_threads();
84 :
85 : /**
86 : * \brief Get the list of enabled sources
87 : * \return The list of enabled sources
88 : */
89 : std::set<int> enabled_sources() const;
90 :
91 : /**
92 : * \brief Get the list of sources which are still receiving data
93 : * \return std::set containing ranks of sources which are still receiving data
94 : */
95 : std::set<int> running_sources() const;
96 :
97 : private:
98 : FragmentReceiverManager(FragmentReceiverManager const&) = delete;
99 : FragmentReceiverManager(FragmentReceiverManager&&) = delete;
100 : FragmentReceiverManager& operator=(FragmentReceiverManager const&) = delete;
101 : FragmentReceiverManager& operator=(FragmentReceiverManager&&) = delete;
102 :
103 : void runReceiver_(int);
104 :
105 : bool fragments_ready_() const;
106 :
107 : int get_next_source_() const;
108 :
109 : std::atomic<bool> stop_requested_;
110 :
111 : std::map<int, boost::thread> source_threads_;
112 : std::map<int, std::unique_ptr<TransferInterface>> source_plugins_;
113 : std::unordered_map<int, std::pair<size_t, double>> source_metric_data_;
114 : std::unordered_map<int, std::chrono::steady_clock::time_point> source_metric_send_time_;
115 : std::unordered_map<int, std::atomic<bool>> enabled_sources_;
116 : std::unordered_map<int, std::atomic<bool>> running_sources_;
117 :
118 : std::map<int, FragmentStoreElement> fragment_store_;
119 :
120 : std::mutex input_cv_mutex_;
121 : std::condition_variable input_cv_;
122 : std::mutex output_cv_mutex_;
123 : std::condition_variable output_cv_;
124 :
125 : detail::FragCounter recv_frag_count_; // Number of frags received per source.
126 : detail::FragCounter recv_frag_size_; // Number of bytes received per source.
127 : detail::FragCounter recv_seq_count_; // For counting sequence IDs
128 : bool suppress_noisy_senders_;
129 : size_t suppression_threshold_;
130 :
131 : size_t receive_timeout_;
132 : mutable int last_source_;
133 : };
134 :
135 : /**
136 : * \brief This class contains tracking information for all Fragment objects which have been received from a specific source
137 : *
138 : * This class was designed so that there could be a mutex for each source, instead of locking all sources whenever a
139 : * Fragment had to be retrieved. FragmentStoreElement is itself a container type, sorted by Fragment arrival time. It is a
140 : * modified queue, with only the first element accessible, but it allows elements to be added to either end (for rejected Fragments).
141 : */
142 : class artdaq::FragmentStoreElement
143 : {
144 : public:
145 : /**
146 : * \brief FragmentStoreElement Constructor
147 : */
148 0 : FragmentStoreElement()
149 0 : : frags_()
150 0 : , empty_(true)
151 : {
152 0 : std::cout << "FragmentStoreElement CONSTRUCTOR" << std::endl;
153 0 : }
154 :
155 : /**
156 : * \brief Are any Fragment objects contained in this FragmentStoreElement?
157 : * \return Whether any Fragment objects are contained in this FragmentStoreElement
158 : */
159 0 : bool empty() const
160 : {
161 0 : return empty_;
162 : }
163 :
164 : /**
165 : * \brief Add a Fragment to the front of the FragmentStoreElement
166 : * \param frag Fragment to add
167 : */
168 : void emplace_front(FragmentPtr&& frag)
169 : {
170 : std::unique_lock<std::mutex> lk(mutex_);
171 : frags_.emplace_front(std::move(frag));
172 : empty_ = false;
173 : }
174 :
175 : /**
176 : * \brief Add a Fragment to the end of the FragmentStoreElement
177 : * \param frag Fragment to add
178 : */
179 0 : void emplace_back(FragmentPtr&& frag)
180 : {
181 0 : std::unique_lock<std::mutex> lk(mutex_);
182 0 : frags_.emplace_back(std::move(frag));
183 0 : empty_ = false;
184 0 : }
185 :
186 : /**
187 : * \brief Remove the first Fragment from the FragmentStoreElement and return it
188 : * \return The first Fragment in the FragmentStoreElement
189 : */
190 0 : FragmentPtr front()
191 : {
192 0 : std::unique_lock<std::mutex> lk(mutex_);
193 0 : auto current_fragment = std::move(frags_.front());
194 0 : frags_.pop_front();
195 0 : empty_ = frags_.empty();
196 0 : return current_fragment;
197 0 : }
198 :
199 : /**
200 : * \brief Set the End-Of-Data marker value for this Receiver
201 : * \param eod Number of Receives expected for this receiver
202 : */
203 0 : void SetEndOfData(size_t eod) { eod_marker_ = eod; }
204 : /**
205 : * \brief Get the value of the End-Of-Data marker for this Receiver
206 : * \return The value of the End-Of-Data marker. Returns -1 (0xFFFFFFFFFFFFFFFF) if no EndOfData Fragments received
207 : */
208 0 : size_t GetEndOfData() const { return eod_marker_; }
209 :
210 : /**
211 : * \brief Get the number of Fragments stored in this FragmentStoreElement
212 : * \return The number of Fragments stored in this FragmentStoreElement
213 : */
214 0 : size_t size() const { return frags_.size(); }
215 :
216 : private:
217 : mutable std::mutex mutex_;
218 : FragmentPtrs frags_;
219 : std::atomic<bool> empty_;
220 : size_t eod_marker_{0xFFFFFFFFFFFFFFFF};
221 : };
222 :
223 : inline size_t
224 0 : artdaq::FragmentReceiverManager::
225 : count() const
226 : {
227 0 : return recv_frag_count_.count();
228 : }
229 :
230 : inline size_t
231 : artdaq::FragmentReceiverManager::
232 : slotCount(size_t rank) const
233 : {
234 : return recv_frag_count_.slotCount(rank);
235 : }
236 :
237 : inline size_t
238 0 : artdaq::FragmentReceiverManager::
239 : byteCount() const
240 : {
241 0 : return recv_frag_size_.count();
242 : }
243 : #endif // ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
|