Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_DataSenderManager").c_str()
3 : #include "artdaq/DAQdata/HostMap.hh"
4 : #include "artdaq/DAQrate/DataSenderManager.hh"
5 : #include "artdaq/TransferPlugins/MakeTransferPlugin.hh"
6 :
7 : #include <arpa/inet.h>
8 : #include <netinet/in.h>
9 : #include <poll.h>
10 : #include <sys/socket.h>
11 : #include <sys/types.h>
12 : #include <chrono>
13 : #include "artdaq/DAQdata/TCPConnect.hh"
14 : #include "artdaq/DAQrate/detail/MergeParameterSets.hh"
15 : #include "canvas/Utilities/Exception.h"
16 :
17 0 : artdaq::DataSenderManager::DataSenderManager(const fhicl::ParameterSet& pset)
18 0 : : sent_frag_count_()
19 0 : , broadcast_sends_(pset.get<bool>("broadcast_sends", false))
20 0 : , non_blocking_mode_(pset.get<bool>("nonblocking_sends", false))
21 0 : , send_timeout_us_(pset.get<size_t>("send_timeout_usec", 5000000))
22 0 : , send_retry_count_(pset.get<size_t>("send_retry_count", 2))
23 0 : , should_stop_(false)
24 0 : , highest_sequence_id_routed_(0)
25 : {
26 0 : TLOG(TLVL_DEBUG + 32) << "Received pset: " << pset.to_string();
27 :
28 : // Validate parameters
29 0 : if (send_timeout_us_ == 0)
30 : {
31 0 : send_timeout_us_ = std::numeric_limits<size_t>::max();
32 : }
33 :
34 0 : auto rmConfig = pset.get<fhicl::ParameterSet>("routing_table_config", fhicl::ParameterSet());
35 0 : table_receiver_.reset(new TableReceiver(rmConfig));
36 :
37 0 : hostMap_t host_map = MakeHostMap(pset);
38 0 : auto max_fragment_size_words = pset.get<size_t>("max_fragment_size_words", 0);
39 0 : auto transfer_parameters = pset.get<fhicl::ParameterSet>("transfer_parameters", fhicl::ParameterSet());
40 :
41 0 : auto dests = pset.get<fhicl::ParameterSet>("destinations", fhicl::ParameterSet());
42 0 : for (auto& d : dests.get_pset_names())
43 : {
44 0 : auto dest_pset = dests.get<fhicl::ParameterSet>(d);
45 0 : host_map = MakeHostMap(dest_pset, host_map);
46 0 : }
47 0 : auto host_map_pset = MakeHostMapPset(host_map);
48 0 : fhicl::ParameterSet dests_mod;
49 0 : for (auto& d : dests.get_pset_names())
50 : {
51 0 : auto dest_pset = dests.get<fhicl::ParameterSet>(d);
52 0 : dest_pset.erase("host_map");
53 0 : dest_pset.put<std::vector<fhicl::ParameterSet>>("host_map", host_map_pset);
54 :
55 0 : if (max_fragment_size_words != 0 && !dest_pset.has_key("max_fragment_size_words"))
56 : {
57 0 : dest_pset.put<size_t>("max_fragment_size_words", max_fragment_size_words);
58 : }
59 :
60 0 : auto resultant_set = merge(transfer_parameters, dest_pset);
61 :
62 0 : dests_mod.put<fhicl::ParameterSet>(d, resultant_set);
63 0 : }
64 :
65 0 : for (auto& d : dests_mod.get_pset_names())
66 : {
67 : try
68 : {
69 0 : auto transfer = MakeTransferPlugin(dests_mod, d, TransferInterface::Role::kSend);
70 0 : auto destination_rank = transfer->destination_rank();
71 0 : destinations_.emplace(destination_rank, std::move(transfer));
72 0 : }
73 0 : catch (const std::invalid_argument&)
74 : {
75 0 : TLOG(TLVL_DEBUG + 32) << "Invalid destination specification: " << d;
76 0 : }
77 0 : catch (const cet::exception& ex)
78 : {
79 0 : TLOG(TLVL_WARNING) << "Caught cet::exception: " << ex.what();
80 0 : }
81 0 : catch (...)
82 : {
83 0 : TLOG(TLVL_WARNING) << "Non-cet exception while setting up TransferPlugin: " << d << ".";
84 0 : }
85 0 : }
86 0 : if (destinations_.empty())
87 : {
88 0 : TLOG(TLVL_ERROR) << "No destinations specified!";
89 : }
90 : else
91 : {
92 0 : auto enabled_dests = pset.get<std::vector<size_t>>("enabled_destinations", std::vector<size_t>());
93 0 : if (enabled_dests.empty())
94 : {
95 0 : TLOG(TLVL_INFO) << "enabled_destinations not specified, assuming all destinations enabled.";
96 0 : for (auto& d : destinations_)
97 : {
98 0 : enabled_destinations_.insert(d.first);
99 : }
100 : }
101 : else
102 : {
103 0 : for (auto& d : enabled_dests)
104 : {
105 0 : enabled_destinations_.insert(d);
106 : }
107 : }
108 0 : }
109 0 : }
110 :
111 0 : artdaq::DataSenderManager::~DataSenderManager()
112 : {
113 0 : TLOG(TLVL_DEBUG + 32) << "Shutting down DataSenderManager BEGIN";
114 0 : should_stop_ = true;
115 0 : for (auto& dest : enabled_destinations_)
116 : {
117 0 : if (destinations_.count(dest) != 0u)
118 : {
119 0 : auto sts = destinations_[dest]->transfer_fragment_reliable_mode(std::move(*Fragment::eodFrag(sent_frag_count_.slotCount(dest))));
120 0 : if (sts != TransferInterface::CopyStatus::kSuccess)
121 : {
122 0 : TLOG(TLVL_ERROR) << "Error sending EOD Fragment to sender rank " << dest;
123 : }
124 : // sendFragTo(std::move(*Fragment::eodFrag(nFragments)), dest, true);
125 : }
126 : }
127 0 : TLOG(TLVL_DEBUG + 32) << "Shutting down DataSenderManager END. Sent " << count() << " fragments.";
128 0 : }
129 :
130 0 : size_t artdaq::DataSenderManager::GetRoutingTableEntryCount() const
131 : {
132 0 : return table_receiver_->GetRoutingTableEntryCount();
133 : }
134 :
135 0 : size_t artdaq::DataSenderManager::GetRemainingRoutingTableEntries() const
136 : {
137 0 : return table_receiver_->GetRemainingRoutingTableEntries();
138 : }
139 :
140 0 : int artdaq::DataSenderManager::calcDest_(Fragment::sequence_id_t sequence_id) const
141 : {
142 0 : if (enabled_destinations_.empty())
143 : {
144 0 : return TableReceiver::ROUTING_FAILED; // No destinations configured.
145 : }
146 :
147 0 : if (table_receiver_->RoutingManagerEnabled())
148 : {
149 0 : TLOG(TLVL_DEBUG + 35) << "calcDest_ use_routing_manager check for routing info for seqID=" << sequence_id << " should_stop_=" << should_stop_;
150 0 : return table_receiver_->GetRoutingTableEntry(sequence_id);
151 : }
152 0 : if (enabled_destinations_.size() == 1)
153 : {
154 0 : return *enabled_destinations_.begin(); // Trivial case
155 : }
156 0 : auto index = sequence_id % enabled_destinations_.size();
157 0 : auto it = enabled_destinations_.begin();
158 0 : for (; index > 0; --index)
159 : {
160 0 : ++it;
161 0 : if (it == enabled_destinations_.end())
162 : {
163 0 : it = enabled_destinations_.begin();
164 : }
165 : }
166 0 : return *it;
167 : }
168 :
169 0 : void artdaq::DataSenderManager::RemoveRoutingTableEntry(Fragment::sequence_id_t seq)
170 : {
171 0 : TLOG(TLVL_DEBUG + 35) << "RemoveRoutingTableEntry: Removing sequence ID " << seq << " from routing table. Sent " << GetSentSequenceIDCount(seq) << " Fragments with this Sequence ID.";
172 0 : table_receiver_->RemoveRoutingTableEntry(seq);
173 :
174 0 : std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
175 0 : if (sent_sequence_id_count_.find(seq) != sent_sequence_id_count_.end())
176 : {
177 0 : sent_sequence_id_count_.erase(sent_sequence_id_count_.find(seq));
178 : }
179 0 : }
180 :
181 0 : size_t artdaq::DataSenderManager::GetSentSequenceIDCount(Fragment::sequence_id_t seq)
182 : {
183 0 : std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
184 0 : if (sent_sequence_id_count_.count(seq) == 0u)
185 : {
186 0 : return 0;
187 : }
188 0 : return sent_sequence_id_count_[seq];
189 0 : }
190 :
191 0 : std::pair<int, artdaq::TransferInterface::CopyStatus> artdaq::DataSenderManager::sendFragment(Fragment&& frag)
192 : {
193 : // Precondition: Fragment must be complete and consistent (including
194 : // header information).
195 0 : auto start_time = std::chrono::steady_clock::now();
196 0 : if (frag.type() == Fragment::EndOfDataFragmentType)
197 : {
198 0 : throw cet::exception("LogicError") // NOLINT(cert-err60-cpp)
199 : << "EOD fragments should not be sent on as received: "
200 0 : << "use sendEODFrag() instead.";
201 : }
202 0 : size_t seqID = frag.sequenceID();
203 0 : size_t fragSize = frag.sizeBytes();
204 0 : auto latency_s = frag.getLatency(true);
205 0 : auto isSystemBroadcast = Fragment::isBroadcastFragmentType(frag.type());
206 :
207 0 : double latency = latency_s.tv_sec + (latency_s.tv_nsec / 1000000000.0);
208 0 : TLOG(TLVL_DEBUG + 36) << "sendFragment start frag.fragmentHeader()=" << std::hex << static_cast<void*>(frag.headerBeginBytes()) << ", szB=" << std::dec << fragSize
209 0 : << ", seqID=" << seqID << ", fragID=" << frag.fragmentID() << ", type=" << frag.typeString();
210 0 : int dest = TableReceiver::ROUTING_FAILED;
211 0 : auto outsts = TransferInterface::CopyStatus::kSuccess;
212 0 : if (broadcast_sends_ || isSystemBroadcast)
213 : {
214 0 : for (auto& bdest : enabled_destinations_)
215 : {
216 0 : TLOG(TLVL_DEBUG + 33) << "sendFragment: Sending fragment with seqId " << seqID << " to destination " << bdest << " (broadcast)";
217 : // Gross, we have to copy.
218 0 : auto sts = TransferInterface::CopyStatus::kTimeout;
219 0 : size_t retries = 0; // Have NOT yet tried, so retries <= send_retry_count_ will have it RETRY send_retry_count_ times
220 0 : while (sts == TransferInterface::CopyStatus::kTimeout && retries <= send_retry_count_)
221 : {
222 0 : if (!non_blocking_mode_)
223 : {
224 0 : sts = destinations_[bdest]->transfer_fragment_reliable_mode(Fragment(frag));
225 : }
226 : else
227 : {
228 0 : sts = destinations_[bdest]->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
229 : }
230 0 : ++retries;
231 : }
232 0 : if (sts != TransferInterface::CopyStatus::kSuccess)
233 : {
234 0 : outsts = sts;
235 : }
236 0 : sent_frag_count_.incSlot(bdest);
237 : }
238 0 : }
239 0 : else if (non_blocking_mode_)
240 : {
241 0 : dest = calcDest_(seqID);
242 0 : if (dest == TableReceiver::ROUTING_FAILED)
243 : {
244 0 : TLOG(TLVL_WARNING) << "Could not get destination for seqID " << seqID;
245 : }
246 :
247 0 : if (dest != TableReceiver::ROUTING_FAILED && (destinations_.count(dest) != 0u) && (enabled_destinations_.count(dest) != 0u))
248 : {
249 0 : TLOG(TLVL_DEBUG + 33) << "sendFragment: Sending fragment with seqId " << seqID << " to destination " << dest;
250 0 : TransferInterface::CopyStatus sts = TransferInterface::CopyStatus::kErrorNotRequiringException;
251 0 : auto lastWarnTime = std::chrono::steady_clock::now();
252 0 : size_t retries = 0; // Have NOT yet tried, so retries <= send_retry_count_ will have it RETRY send_retry_count_ times
253 0 : while (sts != TransferInterface::CopyStatus::kSuccess && retries <= send_retry_count_)
254 : {
255 0 : sts = destinations_[dest]->transfer_fragment_min_blocking_mode(frag, send_timeout_us_);
256 0 : if (sts != TransferInterface::CopyStatus::kSuccess && TimeUtils::GetElapsedTime(lastWarnTime) >= 1)
257 : {
258 0 : TLOG(TLVL_WARNING) << "sendFragment: Sending fragment " << seqID << " to destination " << dest << " failed! Retrying...";
259 0 : lastWarnTime = std::chrono::steady_clock::now();
260 : }
261 0 : ++retries;
262 : }
263 0 : if (sts != TransferInterface::CopyStatus::kSuccess)
264 : {
265 0 : outsts = sts;
266 : }
267 : // sendFragTo(std::move(frag), dest);
268 0 : sent_frag_count_.incSlot(dest);
269 : }
270 0 : else if (!should_stop_)
271 : {
272 0 : TLOG(TLVL_ERROR) << "(in non_blocking) calcDest returned invalid destination rank " << dest << "! This event has been lost: " << seqID
273 0 : << ". enabled_destinantions_.size()=" << enabled_destinations_.size();
274 : }
275 : }
276 : else
277 : {
278 0 : auto start = std::chrono::steady_clock::now();
279 0 : while (!should_stop_ && dest == TableReceiver::ROUTING_FAILED)
280 : {
281 0 : dest = calcDest_(seqID);
282 0 : if (dest == TableReceiver::ROUTING_FAILED)
283 : {
284 0 : TLOG(TLVL_WARNING) << "Could not get destination for seqID " << seqID << ", send number " << sent_frag_count_.count() << ", retrying. Waited " << TimeUtils::GetElapsedTime(start) << " s for routing information.";
285 0 : usleep(10000);
286 : }
287 : }
288 0 : if (dest != TableReceiver::ROUTING_FAILED && (destinations_.count(dest) != 0u) && (enabled_destinations_.count(dest) != 0u))
289 : {
290 0 : TLOG(TLVL_DEBUG + 34) << "DataSenderManager::sendFragment: Sending fragment with seqId " << seqID << " to destination " << dest;
291 0 : TransferInterface::CopyStatus sts = TransferInterface::CopyStatus::kErrorNotRequiringException;
292 :
293 0 : sts = destinations_[dest]->transfer_fragment_reliable_mode(std::move(frag));
294 0 : if (sts != TransferInterface::CopyStatus::kSuccess)
295 : {
296 0 : TLOG(TLVL_ERROR) << "sendFragment: Sending fragment " << seqID << " to destination "
297 0 : << dest << " failed! Data has been lost!";
298 : }
299 :
300 : // sendFragTo(std::move(frag), dest);
301 0 : sent_frag_count_.incSlot(dest);
302 0 : outsts = sts;
303 : }
304 0 : else if (!should_stop_)
305 : {
306 0 : TLOG(TLVL_ERROR) << "calcDest returned invalid destination rank " << dest << "! This event has been lost: " << seqID
307 0 : << ". enabled_destinantions_.size()=" << enabled_destinations_.size();
308 : }
309 : }
310 :
311 0 : if (!isSystemBroadcast)
312 : {
313 0 : std::unique_lock<std::mutex> lck(sent_sequence_id_mutex_);
314 0 : sent_sequence_id_count_[seqID]++;
315 0 : }
316 :
317 0 : auto delta_t = TimeUtils::GetElapsedTime(start_time);
318 :
319 0 : if (metricMan)
320 : {
321 0 : TLOG(TLVL_DEBUG + 34) << "sendFragment: sending metrics";
322 0 : metricMan->sendMetric("Data Send Time to Rank " + std::to_string(dest), delta_t, "s", 5, MetricMode::Accumulate);
323 0 : metricMan->sendMetric("Data Send Size to Rank " + std::to_string(dest), fragSize, "B", 5, MetricMode::Accumulate | MetricMode::Maximum);
324 0 : metricMan->sendMetric("Data Send Rate to Rank " + std::to_string(dest), fragSize / delta_t, "B/s", 5, MetricMode::Average);
325 0 : metricMan->sendMetric("Data Send Count to Rank " + std::to_string(dest), sent_frag_count_.slotCount(dest), "fragments", 3, MetricMode::LastPoint);
326 :
327 0 : metricMan->sendMetric("Rank", std::to_string(my_rank), "", 3, MetricMode::LastPoint);
328 0 : metricMan->sendMetric("App Name", app_name, "", 3, MetricMode::LastPoint);
329 :
330 0 : metricMan->sendMetric("Fragment Latency at Send", latency, "s", 4, MetricMode::Average | MetricMode::Maximum);
331 : }
332 :
333 0 : TLOG(TLVL_DEBUG + 34) << "sendFragment: Done sending fragment " << seqID << " to dest=" << dest;
334 0 : return std::make_pair(dest, outsts);
335 : } // artdaq::DataSenderManager::sendFragment
|