Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_TableReceiver").c_str()
3 : #include "artdaq/DAQrate/detail/TableReceiver.hh"
4 :
5 : #include "artdaq/DAQdata/TCPConnect.hh"
6 : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
7 : #include "canvas/Utilities/Exception.h"
8 :
9 : #include <arpa/inet.h>
10 : #include <netinet/in.h>
11 : #include <poll.h>
12 : #include <sys/socket.h>
13 : #include <sys/types.h>
14 : #include <chrono>
15 :
16 0 : artdaq::TableReceiver::TableReceiver(const fhicl::ParameterSet& pset)
17 0 : : use_routing_manager_(pset.get<bool>("use_routing_manager", false))
18 0 : , should_stop_(false)
19 0 : , table_port_(pset.get<int>("table_update_port", 35556))
20 0 : , table_address_(pset.get<std::string>("routing_manager_hostname", "localhost"))
21 0 : , table_socket_(-1)
22 0 : , routing_table_last_(0)
23 0 : , routing_table_max_size_(pset.get<size_t>("routing_table_max_size", 1000))
24 0 : , routing_wait_time_(0)
25 0 : , routing_wait_time_count_(0)
26 0 : , routing_timeout_ms_((pset.get<size_t>("routing_timeout_ms", 1000)))
27 0 : , highest_sequence_id_routed_(0)
28 : {
29 0 : TLOG(TLVL_DEBUG + 32) << "Received pset: " << pset.to_string();
30 :
31 0 : if (use_routing_manager_)
32 : {
33 0 : startTableReceiverThread_();
34 : }
35 0 : }
36 :
37 0 : artdaq::TableReceiver::~TableReceiver()
38 : {
39 0 : TLOG(TLVL_DEBUG + 32) << "Shutting down TableReceiver BEGIN";
40 0 : should_stop_ = true;
41 0 : disconnectFromRoutingManager_();
42 :
43 0 : if (routing_thread_ != nullptr)
44 : {
45 : try
46 : {
47 0 : if (routing_thread_->joinable())
48 : {
49 0 : routing_thread_->join();
50 : }
51 : }
52 0 : catch (...)
53 : { // IGNORED
54 0 : }
55 : }
56 0 : TLOG(TLVL_DEBUG + 32) << "Shutting down TableReceiver END.";
57 0 : }
58 :
59 0 : artdaq::TableReceiver::RoutingTable artdaq::TableReceiver::GetRoutingTable() const
60 : {
61 0 : std::lock_guard<std::mutex> lk(routing_mutex_);
62 0 : RoutingTable routing_table_copy(routing_table_);
63 0 : return routing_table_copy;
64 0 : }
65 :
66 0 : artdaq::TableReceiver::RoutingTable artdaq::TableReceiver::GetAndClearRoutingTable()
67 : {
68 0 : std::lock_guard<std::mutex> lk(routing_mutex_);
69 0 : RoutingTable routing_table_copy(routing_table_);
70 0 : routing_table_.clear();
71 0 : return routing_table_copy;
72 0 : }
73 :
74 0 : int artdaq::TableReceiver::GetRoutingTableEntry(artdaq::Fragment::sequence_id_t seqID)
75 : {
76 0 : if (use_routing_manager_)
77 : {
78 0 : sendTableUpdateRequest_(seqID);
79 0 : auto routing_timeout_ms = routing_timeout_ms_;
80 0 : if (routing_timeout_ms == 0)
81 : {
82 0 : routing_timeout_ms = 3600 * 1000;
83 : }
84 0 : auto condition_wait = routing_timeout_ms > 10 ? std::chrono::milliseconds(10) : std::chrono::milliseconds(routing_timeout_ms);
85 0 : auto start_time = std::chrono::steady_clock::now();
86 0 : while (!should_stop_ && TimeUtils::GetElapsedTimeMilliseconds(start_time) < routing_timeout_ms)
87 : {
88 0 : std::unique_lock<std::mutex> lk(routing_mutex_);
89 0 : routing_cv_.wait_for(lk, condition_wait, [&]() { return routing_table_.count(seqID); });
90 0 : if (routing_table_.count(seqID))
91 : {
92 0 : routing_wait_time_.fetch_add(TimeUtils::GetElapsedTimeMicroseconds(start_time));
93 0 : return routing_table_.at(seqID);
94 : }
95 0 : }
96 0 : TLOG(TLVL_WARNING) << "Bad Omen: Timeout receiving routing information for " << seqID
97 0 : << " in routing_timeout_ms (" << routing_timeout_ms_ << " ms)!";
98 :
99 0 : routing_wait_time_.fetch_add(TimeUtils::GetElapsedTimeMicroseconds(start_time));
100 : }
101 0 : return ROUTING_FAILED;
102 : }
103 :
104 0 : void artdaq::TableReceiver::connectToRoutingManager_()
105 : {
106 0 : auto start_time = std::chrono::steady_clock::now();
107 0 : while (table_socket_ < 0 && TimeUtils::GetElapsedTime(start_time) < 30)
108 : {
109 0 : table_socket_ = TCPConnect(table_address_.c_str(), table_port_);
110 0 : if (table_socket_ < 0)
111 : {
112 0 : TLOG(TLVL_DEBUG + 33) << "Waited " << TimeUtils::GetElapsedTime(start_time) << " s for Routing Manager to open table listen socket";
113 0 : usleep(100000);
114 : }
115 : }
116 0 : if (table_socket_ < 0)
117 : {
118 0 : TLOG(TLVL_ERROR) << "Error creating socket for receiving table updates!";
119 0 : exit(1);
120 : }
121 :
122 0 : detail::RoutingRequest startHdr(my_rank);
123 0 : write(table_socket_, &startHdr, sizeof(startHdr));
124 0 : }
125 :
126 0 : void artdaq::TableReceiver::disconnectFromRoutingManager_()
127 : {
128 0 : detail::RoutingRequest endHdr(my_rank, detail::RoutingRequest::RequestMode::Disconnect);
129 0 : write(table_socket_, &endHdr, sizeof(endHdr));
130 0 : close(table_socket_);
131 0 : table_socket_ = -1;
132 0 : }
133 :
134 0 : void artdaq::TableReceiver::startTableReceiverThread_()
135 : {
136 0 : if (routing_thread_ != nullptr && routing_thread_->joinable())
137 : {
138 0 : routing_thread_->join();
139 : }
140 0 : TLOG(TLVL_INFO) << "Starting Routing Thread";
141 : try
142 : {
143 0 : routing_thread_.reset(new boost::thread(&TableReceiver::receiveTableUpdatesLoop_, this));
144 : char tname[16];
145 0 : snprintf(tname, 16, "%s", "RoutingReceive"); // NOLINT
146 0 : auto handle = routing_thread_->native_handle();
147 0 : pthread_setname_np(handle, tname);
148 : }
149 0 : catch (const boost::exception& e)
150 : {
151 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Routing Table Receive thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
152 0 : std::cerr << "Caught boost::exception starting Routing Table Receive thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
153 0 : exit(5);
154 0 : }
155 0 : }
156 :
157 0 : bool artdaq::TableReceiver::receiveTableUpdate_()
158 : {
159 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": Polling table socket for new routes (address:port = " << table_address_ << ":" << table_port_ << ")";
160 0 : if (table_socket_ == -1)
161 : {
162 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": Opening table socket";
163 0 : connectToRoutingManager_();
164 : }
165 0 : if (table_socket_ == -1)
166 : {
167 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": The table socket was not opened successfully.";
168 0 : return false;
169 : }
170 :
171 : struct pollfd fd;
172 0 : fd.fd = table_socket_;
173 0 : fd.events = POLLIN | POLLPRI;
174 :
175 0 : auto res = poll(&fd, 1, 1000);
176 0 : if (res > 0)
177 : {
178 0 : if (fd.revents & (POLLIN | POLLPRI))
179 : {
180 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": Going to receive RoutingPacketHeader";
181 0 : artdaq::detail::RoutingPacketHeader hdr;
182 0 : ssize_t stss = recv(table_socket_, &hdr, sizeof(hdr), MSG_WAITALL);
183 0 : if (stss != sizeof(hdr))
184 : {
185 0 : TLOG(TLVL_ERROR) << "Error reading Table Header from Table socket, errno=" << errno << " (" << strerror(errno) << ")";
186 0 : disconnectFromRoutingManager_();
187 0 : return false;
188 : }
189 :
190 0 : TLOG(TLVL_DEBUG + 32) << "receiveTableUpdatesLoop_: Checking for valid header with nEntries=" << hdr.nEntries << " header=" << std::hex << hdr.header;
191 0 : if (hdr.header != ROUTING_MAGIC)
192 : {
193 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": non-RoutingPacket received. No ROUTING_MAGIC.";
194 0 : return false;
195 : }
196 0 : if (hdr.nEntries == 0)
197 : {
198 0 : TLOG(TLVL_DEBUG + 33) << __func__ << ": Empty Routing Table update received.";
199 0 : return false;
200 : }
201 :
202 0 : artdaq::detail::RoutingPacket buffer(hdr.nEntries);
203 0 : size_t sts = 0;
204 0 : size_t total = sizeof(artdaq::detail::RoutingPacketEntry) * hdr.nEntries;
205 0 : while (sts < total)
206 : {
207 0 : stss = read(table_socket_, reinterpret_cast<char*>(&buffer[0]) + sts, total - sts);
208 0 : sts += stss;
209 0 : TLOG(TLVL_DEBUG + 32) << "Read " << stss << " bytes, total " << sts << " / " << total;
210 0 : if (stss < 0)
211 : {
212 0 : TLOG(TLVL_ERROR) << "Error reading Table Data from Table socket, errno=" << errno << " (" << strerror(errno) << ")";
213 0 : disconnectFromRoutingManager_();
214 0 : return false;
215 : }
216 : }
217 :
218 0 : auto first = buffer.front().sequence_id;
219 0 : auto last = buffer.back().sequence_id;
220 :
221 0 : if (first + hdr.nEntries - 1 != last)
222 : {
223 0 : TLOG(TLVL_ERROR) << __func__ << ": Skipping this RoutingPacket because the first (" << first << ") and last (" << last << ") entries are inconsistent (sz=" << hdr.nEntries << ")!";
224 0 : return false;
225 : }
226 :
227 0 : auto thisSeqID = first;
228 :
229 : {
230 0 : std::lock_guard<std::mutex> lck(routing_mutex_);
231 0 : if (routing_table_.count(last) == 0)
232 : {
233 0 : for (auto entry : buffer)
234 : {
235 0 : if (thisSeqID != entry.sequence_id)
236 : {
237 0 : TLOG(TLVL_ERROR) << __func__ << ": Aborting processing of this RoutingPacket because I encountered an inconsistent entry (seqid=" << entry.sequence_id << ", expected=" << thisSeqID << ")!";
238 0 : last = thisSeqID - 1;
239 0 : break;
240 : }
241 0 : thisSeqID++;
242 0 : if (routing_table_.count(entry.sequence_id) != 0u)
243 : {
244 0 : if (routing_table_[entry.sequence_id] != entry.destination_rank)
245 : {
246 0 : TLOG(TLVL_ERROR) << __func__ << ": Detected routing table corruption! Recevied update specifying that sequence ID " << entry.sequence_id
247 0 : << " should go to rank " << entry.destination_rank << ", but I had already been told to send it to " << routing_table_[entry.sequence_id] << "!"
248 0 : << " I will use the original value!";
249 : }
250 0 : continue;
251 0 : }
252 0 : if (entry.sequence_id < routing_table_last_)
253 : {
254 0 : continue;
255 : }
256 0 : routing_table_[entry.sequence_id] = entry.destination_rank;
257 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": (my_rank=" << my_rank << ") received update: SeqID " << entry.sequence_id
258 0 : << " -> Rank " << entry.destination_rank;
259 : }
260 : }
261 :
262 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": There are now " << routing_table_.size() << " entries in the Routing Table";
263 0 : if (!routing_table_.empty())
264 : {
265 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": Last routing table entry is seqID=" << routing_table_.rbegin()->first;
266 : }
267 :
268 0 : auto counter = 0;
269 0 : for (auto& entry : routing_table_)
270 : {
271 0 : TLOG(TLVL_DEBUG + 40) << "Routing Table Entry" << counter << ": " << entry.first << " -> " << entry.second;
272 0 : counter++;
273 : }
274 0 : }
275 0 : routing_cv_.notify_all();
276 :
277 0 : SendMetrics();
278 0 : return true;
279 0 : }
280 : else
281 : {
282 0 : TLOG(TLVL_DEBUG + 32) << "Poll indicates socket closure. Disconnecting from Routing Manager";
283 0 : disconnectFromRoutingManager_();
284 0 : return false;
285 : }
286 : }
287 0 : return false;
288 : }
289 :
290 0 : void artdaq::TableReceiver::receiveTableUpdatesLoop_()
291 : {
292 : while (true)
293 : {
294 0 : if (should_stop_)
295 : {
296 0 : TLOG(TLVL_DEBUG + 32) << __func__ << ": should_stop is " << std::boolalpha << should_stop_ << ", stopping";
297 0 : disconnectFromRoutingManager_();
298 0 : return;
299 : }
300 :
301 0 : receiveTableUpdate_();
302 0 : }
303 : }
304 :
305 0 : void artdaq::TableReceiver::sendTableUpdateRequest_(Fragment::sequence_id_t seq)
306 : {
307 0 : TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ BEGIN";
308 : {
309 0 : std::lock_guard<std::mutex> lck(routing_mutex_);
310 0 : if (routing_table_.count(seq))
311 : {
312 0 : TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ END (no request sent): " << routing_table_.at(seq);
313 0 : return;
314 : }
315 0 : }
316 0 : if (table_socket_ == -1)
317 : {
318 0 : connectToRoutingManager_();
319 : }
320 :
321 0 : TLOG(TLVL_DEBUG + 32) << "sendTableUpdateRequest_: Sending table update request for " << my_rank << ", sequence ID " << seq;
322 0 : detail::RoutingRequest pkt(my_rank, seq);
323 0 : write(table_socket_, &pkt, sizeof(pkt));
324 :
325 0 : TLOG(TLVL_DEBUG + 33) << "sendTableUpdateRequest_ END";
326 : }
327 :
328 0 : size_t artdaq::TableReceiver::GetRoutingTableEntryCount() const
329 : {
330 0 : std::lock_guard<std::mutex> lck(routing_mutex_);
331 0 : return routing_table_.size();
332 0 : }
333 :
334 0 : size_t artdaq::TableReceiver::GetRemainingRoutingTableEntries() const
335 : {
336 0 : std::lock_guard<std::mutex> lck(routing_mutex_);
337 : // Find the distance from the next highest sequence ID to the end of the list
338 0 : size_t dist = std::distance(routing_table_.upper_bound(highest_sequence_id_routed_), routing_table_.end());
339 0 : return dist; // If dist == 1, there is one entry left.
340 0 : }
341 :
342 0 : void artdaq::TableReceiver::RemoveRoutingTableEntry(Fragment::sequence_id_t seq)
343 : {
344 0 : TLOG(TLVL_DEBUG + 35) << "RemoveRoutingTableEntry: Removing sequence ID " << seq << " from routing table.";
345 0 : std::lock_guard<std::mutex> lck(routing_mutex_);
346 : // while (routing_table_.size() > routing_table_max_size_)
347 : // {
348 : // routing_table_.erase(routing_table_.begin());
349 : // }
350 0 : if (routing_table_.find(seq) != routing_table_.end())
351 : {
352 0 : routing_table_.erase(routing_table_.find(seq));
353 : }
354 0 : }
355 :
356 0 : void artdaq::TableReceiver::SendMetrics() const
357 : {
358 0 : if (metricMan)
359 : {
360 0 : TLOG(TLVL_DEBUG + 34) << "sending metrics";
361 0 : if (use_routing_manager_)
362 : {
363 0 : metricMan->sendMetric("Routing Table Size", GetRoutingTableEntryCount(), "events", 2, MetricMode::LastPoint);
364 0 : if (routing_wait_time_ > 0)
365 : {
366 0 : metricMan->sendMetric("Routing Wait Time", static_cast<double>(routing_wait_time_.load()) / 1000000, "s", 2, MetricMode::Average);
367 0 : routing_wait_time_ = 0;
368 : }
369 : }
370 : }
371 0 : }
|