Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_DataReceiverManager").c_str()
3 :
4 : #include "artdaq/DAQdata/HostMap.hh"
5 : #include "artdaq/DAQrate/DataReceiverManager.hh"
6 : #include "artdaq/DAQrate/detail/MergeParameterSets.hh"
7 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
8 :
9 : #include "cetlib_except/exception.h"
10 : #include "fhiclcpp/ParameterSet.h"
11 :
12 : #include <boost/bind.hpp>
13 : #include <boost/exception/all.hpp>
14 : #include <boost/thread.hpp>
15 :
16 : #include <chrono>
17 : #include <iomanip>
18 : #include <thread>
19 : #include <utility>
20 :
21 2 : artdaq::DataReceiverManager::DataReceiverManager(const fhicl::ParameterSet& pset, std::shared_ptr<SharedMemoryEventManager> shm)
22 2 : : stop_requested_(false)
23 2 : , stop_requested_time_(0)
24 2 : , recv_frag_count_()
25 2 : , recv_frag_size_()
26 2 : , recv_seq_count_()
27 4 : , receive_timeout_(pset.get<size_t>("receive_timeout_usec", 100000))
28 4 : , stop_timeout_ms_(pset.get<size_t>("stop_timeout_ms", 1500))
29 2 : , shm_manager_(std::move(std::move(shm)))
30 4 : , non_reliable_mode_enabled_(pset.get<bool>("non_reliable_mode", false))
31 8 : , non_reliable_mode_retry_count_(pset.get<size_t>("non_reliable_mode_retry_count", -1))
32 : {
33 4 : TLOG(TLVL_DEBUG + 32) << "Constructor";
34 6 : auto enabled_srcs = pset.get<std::vector<int>>("enabled_sources", std::vector<int>());
35 2 : auto enabled_srcs_empty = enabled_srcs.empty();
36 :
37 2 : if (non_reliable_mode_enabled_)
38 : {
39 0 : TLOG(TLVL_WARNING) << "DataReceiverManager is configured to drop data after " << non_reliable_mode_retry_count_
40 0 : << " failed attempts to put data into the SharedMemoryEventManager! If this is unexpected, please check your configuration!";
41 : }
42 :
43 2 : if (enabled_srcs_empty)
44 : {
45 6 : TLOG(TLVL_INFO) << "enabled_sources not specified, assuming all sources enabled.";
46 : }
47 : else
48 : {
49 0 : for (auto& s : enabled_srcs)
50 : {
51 0 : enabled_sources_[s] = true;
52 : }
53 : }
54 :
55 2 : hostMap_t host_map = MakeHostMap(pset);
56 4 : auto max_fragment_size_words = pset.get<size_t>("max_fragment_size_words", 0);
57 6 : auto transfer_parameters = pset.get<fhicl::ParameterSet>("transfer_parameters", fhicl::ParameterSet());
58 :
59 6 : auto srcs = pset.get<fhicl::ParameterSet>("sources", fhicl::ParameterSet());
60 4 : for (auto& s : srcs.get_pset_names())
61 : {
62 2 : auto src_pset = srcs.get<fhicl::ParameterSet>(s);
63 2 : host_map = MakeHostMap(src_pset, host_map);
64 4 : }
65 2 : auto host_map_pset = MakeHostMapPset(host_map);
66 2 : fhicl::ParameterSet srcs_mod;
67 4 : for (auto& s : srcs.get_pset_names())
68 : {
69 2 : auto src_pset = srcs.get<fhicl::ParameterSet>(s);
70 4 : src_pset.erase("host_map");
71 2 : src_pset.put<std::vector<fhicl::ParameterSet>>("host_map", host_map_pset);
72 :
73 2 : if (max_fragment_size_words != 0 && !src_pset.has_key("max_fragment_size_words"))
74 : {
75 0 : src_pset.put<size_t>("max_fragment_size_words", max_fragment_size_words);
76 : }
77 :
78 2 : auto resultant_set = merge(transfer_parameters, src_pset);
79 :
80 2 : srcs_mod.put<fhicl::ParameterSet>(s, resultant_set);
81 4 : }
82 :
83 4 : for (auto& s : srcs_mod.get_pset_names())
84 : {
85 : try
86 : {
87 : auto transfer = std::unique_ptr<TransferInterface>(MakeTransferPlugin(srcs_mod, s,
88 2 : TransferInterface::Role::kReceive));
89 2 : auto source_rank = transfer->source_rank();
90 2 : if (enabled_srcs_empty)
91 : {
92 2 : enabled_sources_[source_rank] = true;
93 : }
94 0 : else if (enabled_sources_.count(source_rank) == 0u)
95 : {
96 0 : enabled_sources_[source_rank] = false;
97 : }
98 2 : running_sources_[source_rank] = false;
99 2 : source_plugins_[source_rank] = std::move(transfer);
100 2 : }
101 0 : catch (const cet::exception& ex)
102 : {
103 0 : TLOG(TLVL_WARNING) << "cet::exception caught while setting up source " << s << ": " << ex.what();
104 0 : }
105 0 : catch (const std::exception& ex)
106 : {
107 0 : TLOG(TLVL_WARNING) << "std::exception caught while setting up source " << s << ": " << ex.what();
108 0 : }
109 0 : catch (...)
110 : {
111 0 : TLOG(TLVL_WARNING) << "Non-cet exception caught while setting up source " << s << ".";
112 0 : }
113 2 : }
114 2 : if (srcs.get_pset_names().empty())
115 : {
116 0 : TLOG(TLVL_ERROR) << "No sources configured!";
117 : }
118 2 : }
119 :
120 2 : artdaq::DataReceiverManager::~DataReceiverManager()
121 : {
122 4 : TLOG(TLVL_DEBUG + 33) << "~DataReceiverManager: BEGIN";
123 2 : stop_threads();
124 2 : shm_manager_.reset();
125 4 : TLOG(TLVL_DEBUG + 33) << "Destructor END";
126 2 : }
127 :
128 1 : void artdaq::DataReceiverManager::start_threads()
129 : {
130 1 : stop_requested_ = false;
131 1 : if (shm_manager_)
132 : {
133 1 : shm_manager_->setRequestMode(artdaq::detail::RequestMessageMode::Normal);
134 : }
135 2 : for (auto& source : source_plugins_)
136 : {
137 1 : auto& rank = source.first;
138 1 : if ((enabled_sources_.count(rank) != 0u) && enabled_sources_[rank].load())
139 : {
140 1 : recv_frag_count_.setSlot(rank, 0);
141 1 : recv_frag_size_.setSlot(rank, 0);
142 1 : recv_seq_count_.setSlot(rank, 0);
143 :
144 1 : running_sources_[rank] = true;
145 1 : boost::thread::attributes attrs;
146 1 : attrs.set_stack_size(4096 * 2000); // 2000 KB
147 : try
148 : {
149 1 : source_threads_[rank] = boost::thread(attrs, boost::bind(&DataReceiverManager::runReceiver_, this, rank));
150 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
151 1 : snprintf(tname, sizeof(tname) - 1, "%d-%d RECV", rank, my_rank); // NOLINT
152 1 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
153 1 : auto handle = source_threads_[rank].native_handle();
154 1 : pthread_setname_np(handle, tname);
155 : }
156 0 : catch (const boost::exception& e)
157 : {
158 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
159 0 : std::cerr << "Caught boost::exception starting Receiver " << rank << " thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
160 0 : exit(5);
161 0 : }
162 1 : }
163 : }
164 1 : }
165 :
166 2 : void artdaq::DataReceiverManager::stop_threads()
167 : {
168 4 : TLOG(TLVL_DEBUG + 33) << "stop_threads: BEGIN: Setting stop_requested to true, frags=" << count() << ", bytes=" << byteCount();
169 :
170 2 : stop_requested_time_ = TimeUtils::gettimeofday_us();
171 2 : stop_requested_ = true;
172 :
173 2 : auto initial_count = running_sources().size();
174 4 : TLOG(TLVL_DEBUG + 33) << "stop_threads: Waiting for " << initial_count << " running receiver threads to stop";
175 2 : auto wait_start = std::chrono::steady_clock::now();
176 2 : auto last_report = std::chrono::steady_clock::now();
177 4 : while (!running_sources().empty() && TimeUtils::GetElapsedTime(wait_start) < 60.0)
178 : {
179 0 : usleep(10000);
180 0 : if (TimeUtils::GetElapsedTime(last_report) > 1.0)
181 : {
182 0 : TLOG(TLVL_DEBUG + 32) << "stop_threads: Waited " << TimeUtils::GetElapsedTime(wait_start) << " s for " << initial_count
183 0 : << " receiver threads to end (" << running_sources().size() << " remain)";
184 0 : last_report = std::chrono::steady_clock::now();
185 : }
186 : }
187 2 : if (!running_sources().empty())
188 : {
189 0 : TLOG(TLVL_WARNING) << "stop_threads: Timeout expired while waiting for all receiver threads to end. There are "
190 0 : << running_sources().size() << " threads remaining.";
191 : }
192 :
193 4 : TLOG(TLVL_DEBUG + 33) << "stop_threads: Joining " << source_threads_.size() << " receiver threads";
194 3 : for (auto& source_thread : source_threads_)
195 : {
196 2 : TLOG(TLVL_DEBUG + 33) << "stop_threads: Joining thread for source_rank " << source_thread.first;
197 : try
198 : {
199 1 : if (source_thread.second.joinable())
200 : {
201 1 : source_thread.second.join();
202 : }
203 : else
204 : {
205 0 : TLOG(TLVL_ERROR) << "stop_threads: Thread for source rank " << source_thread.first << " is not joinable!";
206 : }
207 : }
208 0 : catch (...)
209 : {
210 : // IGNORED
211 0 : }
212 : }
213 2 : source_threads_.clear(); // To prevent error messages from shutdown-after-stop
214 :
215 4 : TLOG(TLVL_DEBUG + 33) << "stop_threads: END";
216 2 : }
217 :
218 9 : std::set<int> artdaq::DataReceiverManager::enabled_sources() const
219 : {
220 9 : std::set<int> output;
221 18 : for (auto& src : enabled_sources_)
222 : {
223 9 : if (src.second)
224 : {
225 9 : output.insert(src.first);
226 : }
227 : }
228 9 : return output;
229 0 : }
230 :
231 25 : std::set<int> artdaq::DataReceiverManager::running_sources() const
232 : {
233 25 : std::set<int> output;
234 50 : for (auto& src : running_sources_)
235 : {
236 25 : if (src.second)
237 : {
238 13 : output.insert(src.first);
239 : }
240 : }
241 25 : return output;
242 0 : }
243 :
244 1 : void artdaq::DataReceiverManager::runReceiver_(int source_rank)
245 : {
246 1 : std::chrono::steady_clock::time_point start_time, after_header, before_body, after_body, end_time = std::chrono::steady_clock::now();
247 : int ret;
248 : detail::RawFragmentHeader header;
249 1 : size_t endOfDataCount = -1;
250 1 : auto sleep_time = receive_timeout_ / 100 > 100000 ? 100000 : receive_timeout_ / 100;
251 1 : if (sleep_time < 5000)
252 : {
253 1 : sleep_time = 5000;
254 : }
255 1 : auto max_retries = non_reliable_mode_retry_count_ * ceil(receive_timeout_ / sleep_time);
256 :
257 13 : while (!(stop_requested_ && TimeUtils::gettimeofday_us() - stop_requested_time_ > stop_timeout_ms_ * 1000) && (enabled_sources_.count(source_rank) != 0u))
258 : {
259 26 : TLOG(TLVL_DEBUG + 35) << "runReceiver_: Begin loop stop_requested_=" << stop_requested_ << ", stop_timeout_ms_=" << stop_timeout_ms_ << ", enabled_sources_.count(source_rank)=" << enabled_sources_.count(source_rank) << ", now - stop_requested_time_=" << (TimeUtils::gettimeofday_us() - stop_requested_time_);
260 13 : std::this_thread::yield();
261 :
262 : // Don't stop receiving until we haven't received anything for 1 second
263 13 : if (endOfDataCount <= recv_frag_count_.slotCount(source_rank) && !source_plugins_[source_rank]->isRunning())
264 : {
265 2 : TLOG(TLVL_DEBUG + 32) << "runReceiver_: End of Data conditions met, ending runReceiver loop";
266 1 : break;
267 : }
268 :
269 12 : start_time = std::chrono::steady_clock::now();
270 :
271 24 : TLOG(TLVL_DEBUG + 35) << "runReceiver_: Calling receiveFragmentHeader tmo=" << receive_timeout_;
272 12 : ret = source_plugins_[source_rank]->receiveFragmentHeader(header, receive_timeout_);
273 24 : TLOG(TLVL_DEBUG + 35) << "runReceiver_: Done with receiveFragmentHeader, ret=" << ret << " (should be " << source_rank << ")";
274 12 : if (ret != source_rank)
275 : {
276 10 : if (ret >= 0)
277 : {
278 0 : TLOG(TLVL_WARNING) << "Received Fragment from rank " << ret << ", but was expecting one from rank " << source_rank << "!";
279 : }
280 10 : else if (ret == TransferInterface::DATA_END)
281 : {
282 0 : TLOG(TLVL_ERROR) << "Transfer Plugin returned DATA_END, ending receive loop!";
283 0 : break;
284 : }
285 10 : if (*running_sources().begin() == source_rank) // Only do this for the first sender in the running_sources_ map
286 : {
287 20 : TLOG(TLVL_DEBUG + 34) << "Calling SMEM::CheckPendingBuffers from DRM receiver thread for " << source_rank << " to make sure that things aren't stuck";
288 10 : shm_manager_->CheckPendingBuffers();
289 : }
290 :
291 10 : usleep(sleep_time);
292 10 : continue; // Receive timeout or other oddness
293 10 : }
294 :
295 2 : after_header = std::chrono::steady_clock::now();
296 :
297 2 : if (Fragment::isUserFragmentType(header.type) || header.type == Fragment::DataFragmentType || header.type == Fragment::EmptyFragmentType || header.type == Fragment::ContainerFragmentType)
298 : {
299 2 : TLOG(TLVL_DEBUG + 33) << "Received Fragment Header from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp << ", type " << header.type;
300 1 : RawDataType* loc = nullptr;
301 1 : size_t retries = 0;
302 1 : auto latency_s = header.getLatency(true);
303 1 : auto latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
304 2 : while (loc == nullptr) //&& TimeUtils::GetElapsedTimeMicroseconds(after_header)) < receive_timeout_)
305 : {
306 1 : loc = shm_manager_->WriteFragmentHeader(header);
307 :
308 : // Break here and outside of the loop to go to the cleanup steps at the end of runReceiver_
309 1 : if (loc == nullptr && stop_requested_)
310 : {
311 0 : break;
312 : }
313 :
314 1 : if (loc == nullptr)
315 : {
316 0 : usleep(sleep_time);
317 : }
318 1 : retries++;
319 1 : if (non_reliable_mode_enabled_ && retries > max_retries)
320 : {
321 0 : loc = shm_manager_->WriteFragmentHeader(header, true);
322 : }
323 : }
324 : // Break here to go to cleanup at the end of runReceiver_
325 1 : if (loc == nullptr && stop_requested_)
326 : {
327 0 : break;
328 : }
329 1 : if (loc == nullptr)
330 : {
331 : // Could not enqueue event!
332 0 : TLOG(TLVL_ERROR) << "runReceiver_: Could not get data location for event " << header.sequence_id;
333 0 : continue;
334 0 : }
335 1 : before_body = std::chrono::steady_clock::now();
336 :
337 2 : TLOG(TLVL_DEBUG + 35) << "runReceiver_: Calling receiveFragmentData from rank " << source_rank << ", sequence ID " << header.sequence_id << ", timestamp " << header.timestamp;
338 1 : auto ret2 = source_plugins_[source_rank]->receiveFragmentData(loc, header.word_count - header.num_words());
339 2 : TLOG(TLVL_DEBUG + 35) << "runReceiver_: Done with receiveFragmentData, ret2=" << ret2 << " (should be " << source_rank << ")";
340 :
341 1 : if (ret != ret2)
342 : {
343 0 : TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
344 0 : TLOG(TLVL_ERROR) << "Error receiving data from rank " << source_rank << ", data has been lost! Event " << header.sequence_id << " will most likely be Incomplete!";
345 :
346 : // Mark the Fragment as invalid
347 0 : header.valid = false;
348 0 : header.complete = false;
349 :
350 0 : shm_manager_->DoneWritingFragment(header);
351 : // throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader! (Expected: " << ret << ", Got: " << ret2 << ")";
352 0 : continue;
353 0 : }
354 :
355 1 : shm_manager_->DoneWritingFragment(header);
356 2 : TLOG(TLVL_DEBUG + 33) << "Done receiving fragment with sequence ID " << header.sequence_id << " from rank " << source_rank;
357 :
358 1 : recv_frag_count_.incSlot(source_rank);
359 1 : recv_frag_size_.incSlot(source_rank, header.word_count * sizeof(RawDataType));
360 1 : recv_seq_count_.setSlot(source_rank, header.sequence_id);
361 1 : if (endOfDataCount != static_cast<size_t>(-1))
362 : {
363 0 : TLOG(TLVL_DEBUG + 32) << "Received fragment " << header.sequence_id << " from rank " << source_rank
364 0 : << " (" << recv_frag_count_.slotCount(source_rank) << "/" << endOfDataCount << ")";
365 : }
366 :
367 1 : after_body = std::chrono::steady_clock::now();
368 :
369 1 : auto hdr_delta_t = TimeUtils::GetElapsedTime(start_time, after_header);
370 1 : auto store_delta_t = TimeUtils::GetElapsedTime(after_header, before_body);
371 1 : auto data_delta_t = TimeUtils::GetElapsedTime(before_body, after_body);
372 1 : auto delta_t = TimeUtils::GetElapsedTime(start_time, after_body);
373 1 : auto dead_t = TimeUtils::GetElapsedTime(end_time, start_time);
374 1 : auto recv_wait_t = hdr_delta_t - latency;
375 :
376 1 : uint64_t data_size = header.word_count * sizeof(RawDataType);
377 1 : auto header_size = header.num_words() * sizeof(RawDataType);
378 :
379 1 : if (metricMan)
380 : { //&& recv_frag_count_.slotCount(source_rank) % 100 == 0) {
381 2 : TLOG(TLVL_DEBUG + 34) << "runReceiver_: Sending receive stats for rank " << source_rank;
382 4 : metricMan->sendMetric("Total Receive Time From Rank " + std::to_string(source_rank), delta_t, "s", 5, MetricMode::Accumulate);
383 4 : metricMan->sendMetric("Total Receive Size From Rank " + std::to_string(source_rank), data_size, "B", 5, MetricMode::Accumulate);
384 4 : metricMan->sendMetric("Total Receive Rate From Rank " + std::to_string(source_rank), data_size / delta_t, "B/s", 5, MetricMode::Average);
385 :
386 4 : metricMan->sendMetric("Header Receive Time From Rank " + std::to_string(source_rank), hdr_delta_t, "s", 5, MetricMode::Accumulate);
387 4 : metricMan->sendMetric("Header Receive Size From Rank " + std::to_string(source_rank), header_size, "B", 5, MetricMode::Accumulate);
388 4 : metricMan->sendMetric("Header Receive Rate From Rank " + std::to_string(source_rank), header_size / hdr_delta_t, "B/s", 5, MetricMode::Average);
389 :
390 1 : auto payloadSize = data_size - header_size;
391 4 : metricMan->sendMetric("Data Receive Time From Rank " + std::to_string(source_rank), data_delta_t, "s", 5, MetricMode::Accumulate);
392 4 : metricMan->sendMetric("Data Receive Size From Rank " + std::to_string(source_rank), payloadSize, "B", 5, MetricMode::Accumulate);
393 4 : metricMan->sendMetric("Data Receive Rate From Rank " + std::to_string(source_rank), payloadSize / data_delta_t, "B/s", 5, MetricMode::Average);
394 :
395 4 : metricMan->sendMetric("Data Receive Count From Rank " + std::to_string(source_rank), recv_frag_count_.slotCount(source_rank), "fragments", 3, MetricMode::LastPoint);
396 :
397 4 : metricMan->sendMetric("Total Shared Memory Wait Time From Rank " + std::to_string(source_rank), store_delta_t, "s", 3, MetricMode::Accumulate);
398 4 : metricMan->sendMetric("Avg Shared Memory Wait Time From Rank " + std::to_string(source_rank), store_delta_t, "s", 3, MetricMode::Average);
399 4 : metricMan->sendMetric("Avg Fragment Wait Time From Rank " + std::to_string(source_rank), dead_t, "s", 3, MetricMode::Average);
400 :
401 6 : metricMan->sendMetric("Rank", std::to_string(my_rank), "", 3, MetricMode::LastPoint);
402 6 : metricMan->sendMetric("App Name", app_name, "", 3, MetricMode::LastPoint);
403 4 : metricMan->sendMetric("Fragment Latency at Receive From Rank " + std::to_string(source_rank), latency, "s", 4, MetricMode::Average | MetricMode::Maximum);
404 4 : metricMan->sendMetric("Header Receive Wait Time From Rank" + std::to_string(source_rank), recv_wait_t, "s", 4, MetricMode::Average | MetricMode::Maximum | MetricMode::Minimum);
405 :
406 2 : TLOG(TLVL_DEBUG + 34) << "runReceiver_: Done sending receive stats for rank " << source_rank;
407 : }
408 :
409 1 : end_time = std::chrono::steady_clock::now();
410 : }
411 1 : else if (Fragment::isBroadcastFragmentType(header.type))
412 : {
413 2 : TLOG(TLVL_DEBUG + 32) << "Received System Fragment broadcast " << header.sequence_id << " from rank " << source_rank << " of type " << detail::RawFragmentHeader::SystemTypeToString(header.type) << ".";
414 :
415 1 : FragmentPtr frag(new Fragment(header.word_count - header.num_words()));
416 1 : memcpy(frag->headerAddress(), &header, header.num_words() * sizeof(RawDataType));
417 1 : auto ret3 = source_plugins_[source_rank]->receiveFragmentData(frag->headerAddress() + header.num_words(), header.word_count - header.num_words()); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
418 1 : if (ret3 != source_rank)
419 : {
420 0 : TLOG(TLVL_ERROR) << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")";
421 0 : throw cet::exception("DataReceiverManager") << "Unexpected return code from receiveFragmentData after receiveFragmentHeader while receiving System Fragment! (Expected: " << source_rank << ", Got: " << ret3 << ")"; // NOLINT(cert-err60-cpp)
422 : }
423 :
424 1 : switch (header.type)
425 : {
426 1 : case Fragment::EndOfDataFragmentType:
427 1 : shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
428 1 : if (endOfDataCount == static_cast<size_t>(-1))
429 : {
430 1 : endOfDataCount = *(frag->dataBegin());
431 : }
432 : else
433 : {
434 0 : endOfDataCount += *(frag->dataBegin());
435 : }
436 2 : TLOG(TLVL_DEBUG + 32) << "EndOfData Fragment indicates that " << endOfDataCount << " fragments are expected from rank " << source_rank
437 1 : << " (recvd " << recv_frag_count_.slotCount(source_rank) << ").";
438 1 : break;
439 0 : case Fragment::InitFragmentType:
440 0 : TLOG(TLVL_DEBUG + 32) << "Received Init Fragment from rank " << source_rank << ".";
441 0 : shm_manager_->setRequestMode(detail::RequestMessageMode::Normal);
442 0 : shm_manager_->AddInitFragment(frag);
443 0 : break;
444 0 : case Fragment::EndOfRunFragmentType:
445 0 : shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
446 : // shm_manager_->endRun();
447 0 : break;
448 0 : case Fragment::EndOfSubrunFragmentType:
449 : // shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
450 0 : TLOG(TLVL_DEBUG + 32) << "Received EndOfSubrun Fragment from rank " << source_rank
451 0 : << " with sequence_id " << header.sequence_id << " and timestamp " << header.timestamp << ".";
452 0 : if (header.sequence_id != Fragment::InvalidSequenceID)
453 : {
454 0 : shm_manager_->rolloverSubrun(header.sequence_id, header.timestamp, false);
455 : }
456 : else
457 : {
458 0 : shm_manager_->rolloverSubrun(recv_seq_count_.slotCount(source_rank), header.timestamp, false);
459 : }
460 0 : break;
461 0 : case Fragment::ShutdownFragmentType:
462 0 : shm_manager_->setRequestMode(detail::RequestMessageMode::EndOfRun);
463 0 : break;
464 0 : default:
465 0 : break;
466 : }
467 :
468 1 : if (header.type != Fragment::InitFragmentType && header.type != Fragment::EndOfDataFragmentType && header.type != Fragment::ShutdownFragmentType)
469 : {
470 0 : shm_manager_->BroadcastFragment(frag);
471 : }
472 1 : }
473 : }
474 :
475 1 : source_plugins_[source_rank]->flush_buffers();
476 :
477 2 : TLOG(TLVL_DEBUG + 32) << "runReceiver_ " << source_rank << " receive loop exited";
478 1 : running_sources_[source_rank] = false;
479 1 : }
|