Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_FragmentReceiverManager").c_str()
3 :
4 : #include <chrono>
5 : #include <memory>
6 :
7 : #include "artdaq/DAQrate/FragmentReceiverManager.hh"
8 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
9 : #include "cetlib_except/exception.h"
10 :
11 0 : artdaq::FragmentReceiverManager::FragmentReceiverManager(const fhicl::ParameterSet& pset)
12 0 : : stop_requested_(false)
13 0 : , recv_frag_count_()
14 0 : , recv_frag_size_()
15 0 : , recv_seq_count_()
16 0 : , suppress_noisy_senders_(pset.get<bool>("auto_suppression_enabled", true))
17 0 : , suppression_threshold_(pset.get<size_t>("max_receive_difference", 50))
18 0 : , receive_timeout_(pset.get<size_t>("receive_timeout_usec", 100000))
19 0 : , last_source_(-1)
20 : {
21 0 : TLOG(TLVL_DEBUG + 32) << "Constructor";
22 0 : auto enabled_srcs = pset.get<std::vector<int>>("enabled_sources", std::vector<int>());
23 0 : auto enabled_srcs_empty = enabled_srcs.empty();
24 0 : if (enabled_srcs_empty)
25 : {
26 0 : TLOG(TLVL_INFO) << "enabled_sources not specified, assuming all sources enabled.";
27 : }
28 : else
29 : {
30 0 : for (auto& s : enabled_srcs)
31 : {
32 0 : enabled_sources_[s] = true;
33 : }
34 : }
35 :
36 0 : auto srcs = pset.get<fhicl::ParameterSet>("sources", fhicl::ParameterSet());
37 0 : for (auto& s : srcs.get_pset_names())
38 : {
39 : try
40 : {
41 : auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs, s,
42 0 : TransferInterface::Role::kReceive));
43 0 : auto source_rank = transfer->source_rank();
44 0 : if (enabled_srcs_empty)
45 : {
46 0 : enabled_sources_[source_rank] = true;
47 : }
48 0 : else if (enabled_sources_.count(source_rank) == 0u)
49 : {
50 0 : enabled_sources_[source_rank] = false;
51 : }
52 0 : running_sources_[source_rank] = false;
53 0 : source_plugins_[source_rank] = std::move(transfer);
54 0 : fragment_store_[source_rank];
55 0 : source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
56 0 : source_metric_data_[source_rank] = std::pair<size_t, double>();
57 0 : }
58 0 : catch (const cet::exception& ex)
59 : {
60 0 : TLOG(TLVL_WARNING) << "cet::exception caught while setting up source " << s << ": " << ex.what();
61 0 : }
62 0 : catch (const std::exception& ex)
63 : {
64 0 : TLOG(TLVL_WARNING) << "std::exception caught while setting up source " << s << ": " << ex.what();
65 0 : }
66 0 : catch (...)
67 : {
68 0 : TLOG(TLVL_WARNING) << "Non-cet exception caught while setting up source " << s << ".";
69 0 : }
70 0 : }
71 0 : if (srcs.get_pset_names().empty())
72 : {
73 0 : TLOG(TLVL_ERROR) << "No sources configured!";
74 : }
75 0 : }
76 :
77 0 : artdaq::FragmentReceiverManager::~FragmentReceiverManager()
78 : {
79 0 : TLOG(TLVL_DEBUG + 32) << "Destructor";
80 0 : TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: BEGIN: Setting stop_requested to true, frags=" << count() << ", bytes=" << byteCount();
81 0 : stop_requested_ = true;
82 :
83 0 : TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: Notifying all threads";
84 0 : output_cv_.notify_all();
85 :
86 0 : TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: Joining all threads";
87 0 : for (auto& s : source_threads_)
88 : {
89 0 : auto& thread = s.second;
90 : try
91 : {
92 0 : if (thread.joinable())
93 : {
94 0 : thread.join();
95 : }
96 : }
97 0 : catch (...)
98 : {
99 : // IGNORED
100 0 : }
101 : }
102 0 : TLOG(TLVL_DEBUG + 34) << "~FragmentReceiverManager: DONE";
103 0 : }
104 :
105 0 : bool artdaq::FragmentReceiverManager::fragments_ready_() const
106 : {
107 0 : for (auto& it : fragment_store_)
108 : {
109 0 : if (enabled_sources_.count(it.first) == 0u)
110 : {
111 0 : continue;
112 : }
113 0 : if (!it.second.empty()) { return true; }
114 : }
115 0 : return false;
116 : }
117 :
118 0 : int artdaq::FragmentReceiverManager::get_next_source_() const
119 : {
120 : // std::unique_lock<std::mutex> lck(fragment_store_mutex_);
121 0 : std::set<int> ready_sources;
122 0 : for (auto& it : fragment_store_)
123 : {
124 0 : if (enabled_sources_.count(it.first) == 0u)
125 : {
126 0 : continue;
127 : }
128 0 : if (!it.second.empty())
129 : {
130 0 : ready_sources.insert(it.first);
131 : }
132 : }
133 :
134 0 : if (!ready_sources.empty())
135 : {
136 0 : auto iter = ready_sources.find(last_source_);
137 0 : if (iter == ready_sources.end() || ++iter == ready_sources.end())
138 : {
139 0 : TLOG(TLVL_DEBUG + 35) << "get_next_source returning " << *ready_sources.begin();
140 0 : last_source_ = *ready_sources.begin();
141 0 : return *ready_sources.begin();
142 : }
143 :
144 0 : TLOG(TLVL_DEBUG + 35) << "get_next_source returning " << *iter;
145 0 : last_source_ = *iter;
146 0 : return *iter;
147 : }
148 :
149 0 : TLOG(TLVL_DEBUG + 35) << "get_next_source returning -1";
150 0 : return -1;
151 0 : }
152 :
153 0 : void artdaq::FragmentReceiverManager::start_threads()
154 : {
155 0 : for (auto& source : source_plugins_)
156 : {
157 0 : auto& rank = source.first;
158 0 : if (enabled_sources_.count(rank) != 0u)
159 : {
160 0 : running_sources_[rank] = true;
161 : try
162 : {
163 0 : source_threads_[rank] = boost::thread(&FragmentReceiverManager::runReceiver_, this, rank);
164 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
165 0 : snprintf(tname, sizeof(tname) - 1, "%d-%d FRecv", rank, my_rank); // NOLINT
166 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
167 0 : auto handle = source_threads_[rank].native_handle();
168 0 : pthread_setname_np(handle, tname);
169 : }
170 0 : catch (const boost::exception& e)
171 : {
172 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
173 0 : std::cerr << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
174 0 : exit(5);
175 0 : }
176 : }
177 : }
178 0 : }
179 :
180 0 : artdaq::FragmentPtr artdaq::FragmentReceiverManager::recvFragment(int& rank, size_t timeout_usec)
181 : {
182 0 : TLOG(TLVL_DEBUG + 34) << "recvFragment entered tmo=" << timeout_usec << " us";
183 :
184 0 : if (timeout_usec == 0)
185 : {
186 0 : timeout_usec = 1000000;
187 : }
188 :
189 0 : auto ready = fragments_ready_();
190 0 : size_t waited = 0;
191 0 : auto wait_amount = timeout_usec / 1000 > 1000 ? timeout_usec / 1000 : 1000;
192 0 : TLOG(TLVL_DEBUG + 34) << "recvFragment fragment_ready_=" << ready << " before wait";
193 0 : while (!ready && waited < timeout_usec)
194 : {
195 : {
196 0 : std::unique_lock<std::mutex> lck(input_cv_mutex_);
197 0 : input_cv_.wait_for(lck, std::chrono::microseconds(wait_amount));
198 0 : }
199 0 : waited += wait_amount;
200 0 : ready = fragments_ready_();
201 0 : if (running_sources().empty())
202 : {
203 0 : break;
204 : }
205 : }
206 0 : TLOG(TLVL_DEBUG + 34) << "recvFragment fragment_ready_=" << ready << " after waited=" << waited;
207 0 : if (!ready)
208 : {
209 0 : TLOG(TLVL_DEBUG + 34) << "recvFragment: No fragments ready, returning empty";
210 0 : rank = TransferInterface::RECV_TIMEOUT;
211 0 : return std::unique_ptr<Fragment>{};
212 : }
213 :
214 0 : int current_source = get_next_source_();
215 0 : FragmentPtr current_fragment = fragment_store_[current_source].front();
216 0 : output_cv_.notify_all();
217 0 : rank = current_source;
218 :
219 0 : if (current_fragment != nullptr)
220 : {
221 0 : TLOG(TLVL_DEBUG + 34) << "recvFragment: Done rank=" << rank << ", fragment size=" << std::to_string(current_fragment->size()) << " words, seqId=" << current_fragment->sequenceID();
222 : }
223 0 : return current_fragment;
224 0 : }
225 :
226 0 : std::set<int> artdaq::FragmentReceiverManager::running_sources() const
227 : {
228 0 : std::set<int> output;
229 0 : for (auto& src : running_sources_)
230 : {
231 0 : if (src.second)
232 : {
233 0 : output.insert(src.first);
234 : }
235 : }
236 0 : return output;
237 0 : }
238 :
239 0 : std::set<int> artdaq::FragmentReceiverManager::enabled_sources() const
240 : {
241 0 : std::set<int> output;
242 0 : for (auto& src : enabled_sources_)
243 : {
244 0 : if (src.second)
245 : {
246 0 : output.insert(src.first);
247 : }
248 : }
249 0 : return output;
250 0 : }
251 :
252 0 : void artdaq::FragmentReceiverManager::runReceiver_(int source_rank)
253 : {
254 0 : while (!stop_requested_ && (enabled_sources_.count(source_rank) != 0u))
255 : {
256 0 : TLOG(TLVL_DEBUG + 36) << "runReceiver_ " << source_rank << ": Begin loop";
257 0 : auto is_suppressed = suppress_noisy_senders_ && recv_seq_count_.slotCount(source_rank) > suppression_threshold_ + recv_seq_count_.minCount();
258 0 : while (!stop_requested_ && is_suppressed)
259 : {
260 0 : TLOG(TLVL_DEBUG + 37) << "runReceiver_: Suppressing receiver rank " << source_rank;
261 0 : if (!is_suppressed)
262 : {
263 0 : input_cv_.notify_all();
264 : }
265 : else
266 : {
267 0 : std::unique_lock<std::mutex> lck(output_cv_mutex_);
268 0 : output_cv_.wait_for(lck, std::chrono::seconds(1));
269 0 : }
270 0 : is_suppressed = suppress_noisy_senders_ && recv_seq_count_.slotCount(source_rank) > suppression_threshold_ + recv_seq_count_.minCount();
271 : }
272 0 : if (stop_requested_)
273 : {
274 0 : running_sources_[source_rank] = false;
275 0 : return;
276 : }
277 :
278 0 : if (fragment_store_[source_rank].GetEndOfData() <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
279 : {
280 0 : TLOG(TLVL_DEBUG + 32) << "runReceiver_: EndOfData conditions satisfied, ending receive loop";
281 0 : running_sources_[source_rank] = false;
282 0 : return;
283 : }
284 :
285 0 : auto start_time = std::chrono::steady_clock::now();
286 0 : TLOG(TLVL_DEBUG + 36) << "runReceiver_: Calling receiveFragment";
287 0 : auto fragment = std::make_unique<Fragment>();
288 : #if 0
289 : auto ret = source_plugins_[source_rank]->receiveFragment(*fragment, receive_timeout_);
290 : TLOG(TLVL_DEBUG + 36) << "runReceiver_: Done with receiveFragment, ret=" << ret << " (should be " << source_rank << ")";
291 : if (ret != source_rank) continue; // Receive timeout or other oddness
292 : #else
293 : artdaq::detail::RawFragmentHeader hdr;
294 0 : auto ret1 = source_plugins_[source_rank]->receiveFragmentHeader(hdr, receive_timeout_);
295 0 : TLOG(TLVL_DEBUG + 36) << "runReceiver_: Done with receiveFragmentHeader, ret1=" << ret1 << " (should be " << source_rank << ")";
296 :
297 0 : if (ret1 != source_rank)
298 : {
299 0 : continue; // Receive timeout or other oddness
300 : }
301 :
302 0 : fragment->resize(hdr.word_count - hdr.num_words());
303 0 : memcpy(fragment->headerAddress(), &hdr, hdr.num_words() * sizeof(artdaq::RawDataType));
304 0 : auto ret2 = source_plugins_[source_rank]->receiveFragmentData(fragment->headerAddress() + hdr.num_words(), hdr.word_count - hdr.num_words()); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
305 0 : if (ret2 != ret1)
306 : {
307 0 : TLOG(TLVL_ERROR) << "ReceiveFragmentHeader returned " << ret1 << ", but ReceiveFragmentData returned " << ret2;
308 0 : continue;
309 0 : }
310 : #endif
311 :
312 0 : if (fragment->type() == artdaq::Fragment::EndOfDataFragmentType)
313 : {
314 0 : TLOG(TLVL_DEBUG + 33) << "runReceiver_: EndOfData Fragment received!";
315 0 : fragment_store_[source_rank].SetEndOfData(*reinterpret_cast<size_t*>(fragment->dataBegin())); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
316 : }
317 0 : else if (fragment->type() == artdaq::Fragment::DataFragmentType || fragment->type() == artdaq::Fragment::ContainerFragmentType || fragment->isUserFragmentType(fragment->type()))
318 : {
319 0 : TLOG(TLVL_DEBUG + 33) << "runReceiver_: Data Fragment received!";
320 0 : recv_frag_count_.incSlot(source_rank);
321 0 : recv_frag_size_.incSlot(source_rank, fragment->size() * sizeof(RawDataType));
322 0 : recv_seq_count_.setSlot(source_rank, fragment->sequenceID());
323 : }
324 : else
325 : {
326 0 : continue;
327 : }
328 :
329 0 : auto delta_t = std::chrono::duration_cast<std::chrono::duration<double, std::ratio<1>>>(std::chrono::steady_clock::now() - start_time).count();
330 0 : source_metric_data_[source_rank].first += fragment->size() * sizeof(RawDataType);
331 0 : source_metric_data_[source_rank].second += delta_t;
332 :
333 0 : if (metricMan && TimeUtils::GetElapsedTime(source_metric_send_time_[source_rank]) > 1)
334 : {
335 0 : TLOG(TLVL_DEBUG + 37) << "runReceiver_: Sending receive stats";
336 0 : metricMan->sendMetric("Data Receive Time From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].second, "s", 1, MetricMode::Accumulate);
337 0 : metricMan->sendMetric("Data Receive Size From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].first, "B", 1, MetricMode::Accumulate);
338 0 : metricMan->sendMetric("Data Receive Rate From Rank " + std::to_string(source_rank), source_metric_data_[source_rank].first / source_metric_data_[source_rank].second, "B/s", 1, MetricMode::Average);
339 :
340 0 : source_metric_send_time_[source_rank] = std::chrono::steady_clock::now();
341 0 : source_metric_data_[source_rank].first = 0;
342 0 : source_metric_data_[source_rank].second = 0.0;
343 : }
344 :
345 0 : fragment_store_[source_rank].emplace_back(std::move(fragment));
346 0 : TLOG(TLVL_DEBUG + 33) << "runReceiver_: There are now " << fragment_store_[source_rank].size() << " Fragments stored from this source";
347 0 : input_cv_.notify_all();
348 0 : }
349 :
350 0 : running_sources_[source_rank] = false;
351 : }
|