Line data Source code
1 : // Sep 14, 2016. "TERMS AND CONDITIONS" governing this file are in the README
2 : // or COPYING file. If you do not have such a file, one can be obtained by
3 : // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
4 : // $RCSfile: .emacs.gnu,v $
5 : // rev="$Revision: 1.30 $$Date: 2016/03/01 14:27:27 $";
6 :
7 : // C Includes
8 : #include <arpa/inet.h> // ntohl, ntohs
9 : #include <poll.h> // struct pollfd
10 : #include <sys/socket.h> // socket, socklen_t
11 : #include <sys/types.h> // size_t
12 : #include <sys/un.h> // sockaddr_un
13 : #include <cstdlib> // atoi, strtoul
14 :
15 : // C++ Includes
16 : #include <atomic>
17 : #include <fstream>
18 : #include <memory>
19 : #include <mutex>
20 : #include <stdexcept>
21 : #include <string>
22 :
23 : // product Includes
24 : #include "artdaq/DAQdata/Globals.hh"
25 : #define TRACE_NAME (app_name + "_TCPSocketTransfer").c_str()
26 :
27 : // artdaq Includes
28 : #include <iomanip>
29 : #include "artdaq-core/Data/Fragment.hh"
30 : #include "artdaq-core/Utilities/TimeUtils.hh"
31 : #include "artdaq/DAQdata/TCPConnect.hh"
32 : #include "artdaq/DAQdata/TCP_listen_fd.hh"
33 : #include "artdaq/TransferPlugins/TCPSocketTransfer.hh"
34 : #include "artdaq/TransferPlugins/detail/SRSockets.hh"
35 : #include "artdaq/TransferPlugins/detail/Timeout.hh"
36 :
37 : #define USE_SENDMSG 1
38 :
39 : std::atomic<int> artdaq::TCPSocketTransfer::listen_thread_refcount_(0);
40 : std::unique_ptr<boost::thread> artdaq::TCPSocketTransfer::listen_thread_ = nullptr;
41 : std::map<int, std::set<int>> artdaq::TCPSocketTransfer::connected_fds_ = std::map<int, std::set<int>>();
42 : std::map<int, std::set<int>> artdaq::TCPSocketTransfer::first_fragment_received_ = std::map<int, std::set<int>>();
43 : std::mutex artdaq::TCPSocketTransfer::listen_thread_mutex_;
44 : std::mutex artdaq::TCPSocketTransfer::fd_mutex_;
45 :
46 3 : artdaq::TCPSocketTransfer::
47 3 : TCPSocketTransfer(fhicl::ParameterSet const& pset, TransferInterface::Role role)
48 : : TransferInterface(pset, role)
49 3 : , send_fd_(-1)
50 6 : , port_(pset.get<int>("port", portMan->GetTCPSocketTransferPort(destination_rank())))
51 6 : , rcvbuf_(pset.get<size_t>("tcp_receive_buffer_size", 0))
52 6 : , sndbuf_(pset.get<size_t>("tcp_send_buffer_size", max_fragment_size_words_ * sizeof(artdaq::RawDataType) * buffer_count_))
53 6 : , send_retry_timeout_us_(pset.get<size_t>("send_retry_timeout_us", 1000000))
54 3 : , timeoutMessageArmed_(true)
55 6 : , receive_disconnected_wait_s_(pset.get<double>("receive_socket_disconnected_wait_s", 10.0))
56 6 : , receive_err_wait_us_(pset.get<size_t>("receive_socket_disconnected_wait_us", 10000))
57 3 : , receive_socket_has_been_connected_(false)
58 9 : , send_ack_diff_(0)
59 : {
60 6 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << " Constructor: pset=" << pset.to_string() << ", role=" << (role == TransferInterface::Role::kReceive ? "kReceive" : "kSend");
61 3 : connection_was_lost_ = false;
62 :
63 3 : if (role == TransferInterface::Role::kReceive)
64 : {
65 : // Wait for sender to connect...
66 4 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Listening for connections";
67 2 : start_listen_thread_();
68 4 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Done Listening";
69 : }
70 : else
71 : {
72 1 : hostMap_ = MakeHostMap(pset);
73 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Connecting to destination";
74 1 : connect_();
75 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Done Connecting";
76 : }
77 6 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "End of Constructor";
78 3 : }
79 :
80 5 : artdaq::TCPSocketTransfer::~TCPSocketTransfer() noexcept
81 : {
82 6 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Shutting down TCPSocketTransfer";
83 :
84 3 : if (role() == TransferInterface::Role::kSend)
85 : {
86 : // close all open connections (send stop_v0) first
87 1 : MessHead mh = {0, MessHead::stop_v0, htons(TransferInterface::source_rank()), {0}};
88 1 : if (send_fd_ != -1)
89 : {
90 : // should be blocking with modest timeo
91 1 : timeval tv = {0, 100000};
92 1 : socklen_t len = sizeof(tv);
93 1 : setsockopt(send_fd_, SOL_SOCKET, SO_SNDTIMEO, &tv, len);
94 1 : write(send_fd_, &mh, sizeof(mh));
95 : }
96 1 : close(send_fd_);
97 1 : send_fd_ = -1;
98 : }
99 : else
100 : {
101 2 : TCPSocketTransfer::flush_buffers();
102 : try
103 : {
104 2 : if (ack_listen_thread_ && ack_listen_thread_->joinable())
105 : {
106 0 : ack_listen_thread_->join();
107 : }
108 : }
109 0 : catch (...)
110 : {
111 : // IGNORED
112 0 : }
113 :
114 2 : std::lock_guard<std::mutex> lk(listen_thread_mutex_);
115 2 : listen_thread_refcount_--;
116 : try
117 : {
118 2 : if (listen_thread_refcount_ <= 0 && listen_thread_ && listen_thread_->joinable())
119 : {
120 2 : listen_thread_->join();
121 : }
122 : }
123 0 : catch (...)
124 : {
125 : // IGNORED
126 0 : }
127 2 : }
128 :
129 6 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "End of Destructor";
130 5 : }
131 :
132 12 : int artdaq::TCPSocketTransfer::receiveFragmentHeader(detail::RawFragmentHeader& header, size_t timeout_usec)
133 : {
134 24 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "receiveFragmentHeader BEGIN";
135 12 : int ret_rank = RECV_TIMEOUT;
136 :
137 : // Don't bomb out until received at least one connection...
138 12 : if (getConnectedFDCount_(source_rank()) == 0)
139 : { // what if just listen_fd???
140 : // if (receive_socket_has_been_connected_ && TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
141 : // {
142 : // TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentHeader: senders have been disconnected for "
143 : // << TimeUtils::GetElapsedTime(last_recv_time_) << " s (receive_socket_disconnected_wait_s = " << receive_disconnected_wait_s_ << " s). RETURNING DATA_END!";
144 : // return DATA_END;
145 : // }
146 : // if (++not_connected_count_ > receive_err_threshold_) { return DATA_END; }
147 0 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Receive socket not connected, returning RECV_TIMEOUT";
148 0 : usleep(receive_err_wait_us_);
149 0 : return RECV_TIMEOUT;
150 : }
151 12 : receive_socket_has_been_connected_ = true;
152 12 : last_recv_time_ = std::chrono::steady_clock::now();
153 :
154 24 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "timeout_usec=" << timeout_usec;
155 : // void* buff=alloca(max_fragment_size_words_*8);
156 12 : size_t byte_cnt = 0;
157 : int sts;
158 12 : int offset = 0;
159 12 : SocketState state = SocketState::Metadata;
160 12 : int target_bytes = sizeof(MessHead);
161 12 : uint64_t start_time_us = TimeUtils::gettimeofday_us();
162 :
163 : // while (active_receive_fd_ != -1)
164 : //{
165 : // TLOG(TLVL_DEBUG + 33) << GetTraceName() << "Currently receiving from fd " << active_receive_fd_ << ", waiting!";
166 : // usleep(1000);
167 : // }
168 :
169 : uint8_t* buff;
170 :
171 : int timeout_ms;
172 12 : if (timeout_usec == 0)
173 : {
174 0 : timeout_ms = 0;
175 : }
176 : else
177 : {
178 12 : timeout_ms = (timeout_usec + 999) / 1000; // want at least 1 ms
179 : }
180 :
181 12 : bool done = false;
182 12 : bool noDataWarningSent = false;
183 12 : int loop_guard = 0;
184 :
185 16 : while (!done && getConnectedFDCount_(source_rank()) > 0)
186 : {
187 14 : if (getActiveFD_(source_rank()) == -1)
188 : {
189 12 : loop_guard = 0;
190 12 : size_t fd_count = 0;
191 12 : std::vector<pollfd> pollfds;
192 : {
193 12 : std::lock_guard<std::mutex> lk(fd_mutex_);
194 12 : fd_count = connected_fds_[source_rank()].size();
195 12 : bool first = false;
196 12 : if (first_fragment_received_[source_rank()].size() < connected_fds_[source_rank()].size())
197 : {
198 1 : fd_count -= first_fragment_received_[source_rank()].size();
199 1 : first = true;
200 : }
201 12 : pollfds.resize(fd_count);
202 12 : auto iter = connected_fds_[source_rank()].begin();
203 24 : for (size_t ii = 0; ii < fd_count; ++ii)
204 : {
205 12 : while (first && first_fragment_received_[source_rank()].count(*iter))
206 : {
207 0 : ++iter;
208 : }
209 12 : pollfds[ii].events = POLLIN | POLLPRI | POLLERR;
210 12 : pollfds[ii].fd = *iter;
211 12 : ++iter;
212 : }
213 12 : }
214 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragment: Polling fd to see if there's data" ;
215 12 : int num_fds_ready = poll(&pollfds[0], fd_count, timeout_ms);
216 12 : if (num_fds_ready <= 0)
217 : {
218 18 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "No data on receive socket, returning RECV_TIMEOUT";
219 9 : return RECV_TIMEOUT;
220 : }
221 :
222 3 : size_t index = 0;
223 3 : if (getLastActiveFD_(source_rank()) != -1)
224 : {
225 2 : for (auto& pollfd : pollfds)
226 : {
227 2 : index++;
228 2 : if (pollfd.fd == getLastActiveFD_(source_rank()))
229 : {
230 2 : break;
231 : }
232 : }
233 : }
234 :
235 3 : int active_index = -1;
236 3 : int16_t anomolous_events = 0;
237 3 : for (size_t ii = index; ii < index + pollfds.size(); ++ii)
238 : {
239 3 : auto pollfd_index = (ii + index) % pollfds.size();
240 3 : setActiveFD_(source_rank(), pollfds[pollfd_index].fd);
241 3 : if ((pollfds[pollfd_index].revents & (POLLIN | POLLPRI)) != 0)
242 : {
243 3 : active_index = pollfd_index;
244 3 : break;
245 : }
246 0 : if ((pollfds[pollfd_index].revents & (POLLHUP | POLLERR)) != 0)
247 : {
248 0 : disconnect_receive_socket_("Poll returned POLLHUP or POLLERR, indicating problems with the sender.");
249 0 : continue;
250 : }
251 0 : else if ((pollfds[pollfd_index].revents & (POLLNVAL)) != 0)
252 : {
253 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "FD is closed, most likely because the peer went away. Removing from fd list.";
254 0 : disconnect_receive_socket_("FD is closed, most likely because the peer went away.");
255 0 : continue;
256 0 : }
257 0 : else if (pollfds[pollfd_index].revents != 0)
258 : {
259 0 : anomolous_events |= pollfds[pollfd_index].revents;
260 : }
261 : }
262 :
263 3 : if (active_index == -1)
264 : {
265 0 : if (anomolous_events != 0)
266 : {
267 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Wrong event received from a pollfd. Mask: " << static_cast<int>(anomolous_events);
268 : }
269 0 : setActiveFD_(source_rank(), -1);
270 0 : continue;
271 0 : }
272 :
273 3 : if (!done && timeout_usec > 0)
274 : {
275 : // calc next timeout_ms (unless timed out)
276 3 : size_t delta_us = TimeUtils::gettimeofday_us() - start_time_us;
277 3 : if (delta_us > timeout_usec)
278 : {
279 0 : return RECV_TIMEOUT;
280 : }
281 3 : timeout_ms = ((timeout_usec - delta_us) + 999) / 1000; // want at least 1 ms
282 : }
283 12 : }
284 5 : if (loop_guard > 10) { usleep(1000); }
285 5 : if (++loop_guard > 10010)
286 : {
287 0 : TLOG(TLVL_WARNING) << GetTraceName() << "loop guard triggered, returning RECV_TIMEOUT";
288 0 : usleep(receive_err_wait_us_);
289 0 : setActiveFD_(source_rank(), -1);
290 0 : return RECV_TIMEOUT;
291 : }
292 :
293 5 : if (state == SocketState::Metadata)
294 : {
295 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Reading Message Header" ;
296 3 : buff = &(mha[offset]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
297 3 : byte_cnt = sizeof(MessHead) - offset;
298 : }
299 : else
300 : {
301 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Reading data" ;
302 2 : buff = reinterpret_cast<uint8_t*>(&header) + offset; // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
303 2 : byte_cnt = target_bytes - offset;
304 : }
305 : // if (byte_cnt > sizeof(MessHead))
306 : // {
307 : // TLOG(TLVL_ERROR) << "Invalid byte count for read (count=" << byte_cnt
308 : // << ",offset=" << offset << ",mh.byte_count=" << mh.byte_count
309 : // << "), skipping read and returning RECV_TIMEOUT";
310 : // return RECV_TIMEOUT;
311 : // }
312 :
313 5 : auto fd = getActiveFD_(source_rank());
314 5 : if (byte_cnt > 0)
315 : {
316 10 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "Reading " << byte_cnt << " bytes from socket " << fd;
317 5 : sts = read(fd, buff, byte_cnt);
318 10 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "Done with read sts=" << sts;
319 : }
320 5 : if (sts > 0)
321 : {
322 5 : loop_guard = 0;
323 5 : last_recv_time_ = std::chrono::steady_clock::now();
324 : }
325 :
326 10 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "state=" << static_cast<int>(state) << " read=" << sts;
327 5 : if (sts < 0 && errno != EAGAIN)
328 : {
329 0 : TLOG(TLVL_WARNING) << GetTraceName() << "Error on receive, closing socket " << fd
330 0 : << " (errno=" << errno << ": " << strerror(errno) << ")";
331 0 : disconnect_receive_socket_("Error on receive");
332 0 : }
333 5 : else if (sts <= 0 && errno == EAGAIN)
334 : {
335 0 : if (!noDataWarningSent)
336 : {
337 0 : TLOG(TLVL_WARNING) << GetTraceName() << "No data received, is the sender still sending?!?";
338 0 : noDataWarningSent = true;
339 : }
340 0 : if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
341 : {
342 0 : TLOG(TLVL_ERROR) << GetTraceName() << "No data received within timeout, aborting!";
343 0 : return RECV_TIMEOUT;
344 : }
345 0 : }
346 : else
347 : {
348 : // see if we're done (with this state)
349 10 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Checking for complete sts=" << sts << ", offset=" << offset << ", target_bytes=" << target_bytes;
350 5 : sts = offset += sts;
351 5 : if (sts >= target_bytes)
352 : {
353 10 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Target read bytes reached. Changing state";
354 5 : offset = 0;
355 5 : if (state == SocketState::Metadata)
356 : {
357 3 : state = SocketState::Data;
358 3 : mh.byte_count = ntohl(mh.byte_count);
359 3 : mh.source_id = ntohs(mh.source_id);
360 3 : target_bytes = mh.byte_count;
361 6 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Expected header size = " << target_bytes << ", sizeof(RawFragmentHeader) = " << sizeof(artdaq::detail::RawFragmentHeader);
362 : // assert(target_bytes == sizeof(artdaq::detail::RawFragmentHeader) || target_bytes == 0);
363 :
364 3 : if (mh.message_type == MessHead::stop_v0)
365 : {
366 2 : disconnect_receive_socket_("Stop Message received.");
367 : }
368 2 : else if (mh.message_type == MessHead::data_v0 || mh.message_type == MessHead::data_more_v0)
369 : {
370 0 : TLOG(TLVL_WARNING) << GetTraceName() << "Message header indicates that Fragment data follows when I was expecting a Fragment header!";
371 0 : disconnect_receive_socket_("Desync detected");
372 : }
373 :
374 3 : if (target_bytes == 0)
375 : {
376 : // Probably a stop_v0, return timeout so we can try again.
377 1 : return RECV_TIMEOUT;
378 : }
379 : }
380 : else
381 : {
382 2 : ret_rank = source_rank();
383 4 : TLOG(TLVL_DEBUG + 35) << GetTraceName() << "done sts=" << sts << " src=" << ret_rank;
384 4 : TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Done receiving fragment header. Moving into output.";
385 :
386 2 : done = true; // no more polls
387 : // break; // no more read of ready fds
388 :
389 : {
390 2 : std::lock_guard<std::mutex> lk(fd_mutex_);
391 2 : first_fragment_received_[source_rank()].insert(fd);
392 2 : }
393 : }
394 : }
395 : }
396 :
397 : } // while(!done)...poll
398 :
399 4 : TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Returning " << ret_rank;
400 2 : return ret_rank;
401 : }
402 :
403 1 : void artdaq::TCPSocketTransfer::disconnect_receive_socket_(const std::string& msg)
404 : {
405 1 : std::lock_guard<std::mutex> lk(fd_mutex_);
406 1 : auto fd = active_receive_fds_[source_rank()];
407 4 : TLOG(TLVL_WARNING) << GetTraceName() << "disconnect_receive_socket_: " << msg << " Closing socket " << fd << " for rank " << source_rank();
408 1 : close(fd);
409 1 : if (connected_fds_.count(source_rank()) != 0u)
410 : {
411 1 : connected_fds_[source_rank()].erase(fd);
412 : }
413 1 : if (first_fragment_received_.count(source_rank()) != 0)
414 : {
415 1 : first_fragment_received_[source_rank()].erase(fd);
416 : }
417 1 : active_receive_fds_[source_rank()] = -1;
418 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "disconnect_receive_socket_: There are now " << connected_fds_[source_rank()].size() << " active senders.";
419 1 : }
420 :
421 2 : int artdaq::TCPSocketTransfer::receiveFragmentData(RawDataType* destination, size_t /*wordCount*/)
422 : {
423 4 : TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: BEGIN";
424 2 : int ret_rank = RECV_TIMEOUT;
425 2 : if (getActiveFD_(source_rank()) == -1)
426 : { // what if just listen_fd???
427 0 : TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentData: Receive socket not connected, returning RECV_TIMEOUT (Will result in \"Unexpected return code error\")";
428 0 : return RECV_TIMEOUT;
429 : }
430 :
431 : // void* buff=alloca(max_fragment_size_words_*8);
432 : uint8_t* buff;
433 2 : size_t byte_cnt = 0;
434 : int sts;
435 2 : size_t offset = 0;
436 2 : SocketState state = SocketState::Metadata;
437 2 : size_t target_bytes = sizeof(MessHead);
438 :
439 : pollfd pollfd_s;
440 2 : pollfd_s.events = POLLIN | POLLPRI | POLLERR;
441 2 : pollfd_s.fd = getActiveFD_(source_rank());
442 :
443 2 : int loop_guard = 0;
444 2 : bool done = false;
445 2 : bool noDataWarningSent = false;
446 2 : last_recv_time_ = std::chrono::steady_clock::now();
447 6 : while (!done)
448 : {
449 8 : TLOG(TLVL_DEBUG + 33) << GetTraceName() << "receiveFragmentData: Polling fd to see if there's data";
450 4 : int num_fds_ready = poll(&pollfd_s, 1, 1000);
451 8 : TLOG(TLVL_DEBUG + 33) << GetTraceName() << "receiveFragmentData: Polled fd to see if there's data"
452 4 : << ", num_fds_ready = " << num_fds_ready;
453 4 : if (num_fds_ready <= 0)
454 : {
455 0 : if (num_fds_ready == 0)
456 : {
457 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data from " << source_rank() << " in " << TimeUtils::GetElapsedTimeMilliseconds(last_recv_time_) << " ms!"
458 0 : << " State = " << (state == SocketState::Metadata ? "Metadata" : "Data") << ", recvd/total=" << offset << "/" << target_bytes << " (delta=" << target_bytes - offset << ")";
459 :
460 0 : if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
461 : {
462 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data received within timeout (" << TimeUtils::GetElapsedTime(last_recv_time_) << " / " << receive_disconnected_wait_s_ << " ), returning RECV_TIMEOUT";
463 0 : disconnect_receive_socket_("No data on this socket within timeout");
464 0 : return RECV_TIMEOUT;
465 : }
466 0 : continue;
467 0 : }
468 :
469 0 : TLOG(TLVL_ERROR) << "Error in poll: errno=" << errno;
470 0 : break;
471 : }
472 :
473 4 : last_recv_time_ = std::chrono::steady_clock::now();
474 :
475 4 : if ((pollfd_s.revents & (POLLIN | POLLPRI)) != 0)
476 : {
477 : // Expected, don't have to check revents any further
478 : }
479 0 : else if ((pollfd_s.revents & (POLLNVAL)) != 0)
480 : {
481 0 : disconnect_receive_socket_("FD is closed, most likely because the peer went away.");
482 0 : break;
483 : }
484 0 : else if ((pollfd_s.revents & (POLLHUP | POLLERR)) != 0)
485 : {
486 0 : disconnect_receive_socket_("Poll returned POLLHUP or POLLERR, indicating problems with the sender.");
487 0 : break;
488 : }
489 : else
490 : {
491 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Wrong event received from pollfd: " << pollfd_s.revents;
492 0 : disconnect_receive_socket_("Wrong event received from pollfd.");
493 0 : break;
494 : }
495 :
496 4 : if (state == SocketState::Metadata)
497 : {
498 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Reading Message Header" ;
499 2 : buff = &(mha[offset]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
500 2 : byte_cnt = sizeof(MessHead) - offset;
501 : }
502 : else
503 : {
504 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Reading data" ;
505 2 : buff = reinterpret_cast<uint8_t*>(destination) + offset; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic,cppcoreguidelines-pro-type-reinterpret-cast)
506 2 : byte_cnt = mh.byte_count - offset;
507 : }
508 :
509 8 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "receiveFragmentData: Reading " << byte_cnt << " bytes from socket into " << static_cast<void*>(buff);
510 4 : sts = read(getActiveFD_(source_rank()), buff, byte_cnt);
511 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Done with read" ;
512 :
513 8 : TLOG(TLVL_DEBUG + 38) << GetTraceName() << "recvFragment state=" << static_cast<int>(state) << " read=" << sts;
514 :
515 4 : if (sts == 0 || (sts < 0 && errno == EAGAIN))
516 : {
517 0 : sts = 0; // Treat EAGAIN as receiving no data
518 0 : if (loop_guard > 10) { usleep(1000); }
519 0 : if (++loop_guard > 10010)
520 : {
521 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: loop guard triggered, returning RECV_TIMEOUT";
522 0 : setActiveFD_(source_rank(), -1);
523 0 : return RECV_TIMEOUT;
524 : }
525 0 : }
526 4 : else if (sts > 0)
527 : {
528 4 : loop_guard = 0;
529 4 : last_recv_time_ = std::chrono::steady_clock::now();
530 : }
531 :
532 4 : if (sts < 0)
533 : {
534 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Error on receive, closing socket"
535 0 : << " (errno=" << errno << ": " << strerror(errno) << ")";
536 0 : disconnect_receive_socket_("Error on receive");
537 : }
538 4 : else if (sts == 0)
539 : {
540 0 : if (!noDataWarningSent)
541 : {
542 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data received, is the sender still sending?!?";
543 0 : noDataWarningSent = true;
544 : }
545 0 : if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
546 : {
547 0 : TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentData: No data received within timeout, aborting!";
548 0 : return RECV_TIMEOUT;
549 : }
550 : }
551 : else
552 : {
553 : // see if we're done (with this state)
554 4 : sts = offset += sts;
555 4 : if (static_cast<size_t>(sts) >= target_bytes)
556 : {
557 8 : TLOG(TLVL_DEBUG + 42) << GetTraceName() << "receiveFragmentData: Target read bytes reached. Changing state";
558 4 : offset = 0;
559 4 : if (state == SocketState::Metadata)
560 : {
561 2 : state = SocketState::Data;
562 2 : mh.byte_count = ntohl(mh.byte_count);
563 2 : mh.source_id = ntohs(mh.source_id);
564 2 : target_bytes = mh.byte_count;
565 :
566 2 : if (mh.message_type == MessHead::header_v0)
567 : {
568 0 : TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Message header indicates that a Fragment header follows when I was expecting Fragment data!";
569 0 : disconnect_receive_socket_("Desync detected");
570 : }
571 : }
572 : else
573 : {
574 2 : ret_rank = source_rank();
575 4 : TLOG(TLVL_DEBUG + 41) << GetTraceName() << "receiveFragmentData done sts=" << sts << " src=" << ret_rank;
576 4 : TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Done receiving fragment. Moving into output.";
577 :
578 : #if USE_ACKS
579 : send_ack_(active_receive_fd_);
580 : #endif
581 :
582 2 : done = true; // no more polls
583 : // break; // no more read of ready fds
584 : }
585 : }
586 : }
587 :
588 : // Check if we were asked to do a 0-size receive
589 4 : if (target_bytes == 0 && state == SocketState::Data)
590 : {
591 0 : ret_rank = source_rank();
592 0 : TLOG(TLVL_DEBUG + 41) << GetTraceName() << "receiveFragmentData done sts=" << sts << " src=" << ret_rank;
593 0 : TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Done receiving fragment. Moving into output.";
594 :
595 : #if USE_ACKS
596 : send_ack_(active_receive_fd_);
597 : #endif
598 :
599 0 : done = true; // no more polls
600 : }
601 :
602 : } // while(!done)...poll
603 :
604 2 : setLastActiveFD_(source_rank(), getActiveFD_(source_rank()));
605 2 : setActiveFD_(source_rank(), -1);
606 :
607 4 : TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Returning rank " << ret_rank;
608 2 : return ret_rank;
609 : }
610 :
611 2 : bool artdaq::TCPSocketTransfer::isRunning()
612 : {
613 2 : switch (role())
614 : {
615 0 : case TransferInterface::Role::kSend:
616 0 : return send_fd_ != -1;
617 2 : case TransferInterface::Role::kReceive:
618 2 : auto count = getConnectedFDCount_(source_rank());
619 4 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "isRunning: There are " << count << " fds connected.";
620 2 : return count > 0;
621 : }
622 0 : return false;
623 : }
624 :
625 3 : void artdaq::TCPSocketTransfer::flush_buffers()
626 : {
627 3 : std::set<int> fds;
628 3 : auto rank = TransferInterface::source_rank();
629 : {
630 3 : std::lock_guard<std::mutex> lk(fd_mutex_);
631 3 : if (connected_fds_.count(rank) != 0)
632 : {
633 1 : fds = connected_fds_[rank];
634 1 : connected_fds_.erase(rank);
635 : }
636 3 : if (first_fragment_received_.count(rank))
637 : {
638 1 : first_fragment_received_.erase(rank);
639 : }
640 3 : }
641 :
642 : char discard_buf[0x1000];
643 3 : for (auto& fd : fds)
644 : {
645 0 : TLOG(TLVL_INFO) << GetTraceName() << "flush_buffers: Checking for data in socket " << fd << " for rank " << rank;
646 0 : size_t bytes_read = 0;
647 0 : while (int sts = static_cast<int>(read(fd, discard_buf, sizeof(discard_buf)) > 0))
648 : {
649 0 : bytes_read += sts;
650 0 : }
651 0 : if (bytes_read > 0)
652 : {
653 0 : TLOG(TLVL_WARNING) << GetTraceName() << "flush_buffers: Flushed " << bytes_read << " bytes from socket " << fd << " for rank " << rank;
654 : }
655 0 : TLOG(TLVL_INFO) << GetTraceName() << "flush_buffers: Closing socket " << fd << " for rank " << rank;
656 0 : close(fd);
657 : }
658 3 : active_receive_fds_[rank] = -1;
659 3 : last_active_receive_fds_[rank] = -1;
660 3 : }
661 :
662 : // Send the given Fragment. Return the rank of the destination to which
663 : // the Fragment was sent OR -1 if to none.
664 2 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendFragment_(Fragment&& frag, size_t send_timeout_usec)
665 : {
666 4 : TLOG(TLVL_DEBUG + 42) << GetTraceName() << "sendFragment begin send of fragment with sequenceID=" << frag.sequenceID();
667 2 : artdaq::Fragment grab_ownership_frag = std::move(frag);
668 :
669 2 : reconnect_();
670 2 : if (send_fd_ == -1 && connection_was_lost_)
671 : {
672 0 : TLOG(TLVL_INFO) << GetTraceName() << "reconnection attempt failed, returning quickly.";
673 0 : return TransferInterface::CopyStatus::kErrorNotRequiringException;
674 : }
675 :
676 : // Send Fragment Header
677 :
678 : #if USE_ACKS
679 : // Wait for fragments to be received
680 : while (static_cast<size_t>(send_ack_diff_) > buffer_count_) usleep(10000);
681 : #endif
682 :
683 2 : iovec iov = {static_cast<void*>(grab_ownership_frag.headerAddress()),
684 2 : detail::RawFragmentHeader::num_words() * sizeof(RawDataType)};
685 :
686 2 : auto sts = sendData_(&iov, 1, send_retry_timeout_us_, true);
687 2 : auto start_time = std::chrono::steady_clock::now();
688 : // If it takes more than 10 seconds to write a Fragment header, give up
689 2 : while (sts == CopyStatus::kTimeout && (send_timeout_usec == 0 || TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_timeout_usec) && TimeUtils::GetElapsedTimeMicroseconds(start_time) < 10000000)
690 : {
691 0 : TLOG(TLVL_DEBUG + 43) << GetTraceName() << "sendFragment: Timeout sending fragment";
692 0 : sts = sendData_(&iov, 1, send_retry_timeout_us_, true);
693 0 : usleep(1000);
694 : }
695 2 : if (sts != CopyStatus::kSuccess)
696 : {
697 0 : return sts;
698 : }
699 :
700 : // Send Fragment Data
701 :
702 2 : iov = {static_cast<void*>(grab_ownership_frag.headerAddress() + detail::RawFragmentHeader::num_words()), // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
703 2 : grab_ownership_frag.sizeBytes() - detail::RawFragmentHeader::num_words() * sizeof(RawDataType)};
704 2 : sts = sendData_(&iov, 1, send_retry_timeout_us_);
705 2 : start_time = std::chrono::steady_clock::now();
706 2 : while (sts == CopyStatus::kTimeout && (send_timeout_usec == 0 || TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_timeout_usec) && TimeUtils::GetElapsedTimeMicroseconds(start_time) < 10000000)
707 : {
708 0 : TLOG(TLVL_DEBUG + 43) << GetTraceName() << "sendFragment: Timeout sending fragment";
709 0 : sts = sendData_(&iov, 1, send_retry_timeout_us_);
710 0 : usleep(1000);
711 : }
712 :
713 : #if USE_ACKS
714 : send_ack_diff_++;
715 : #endif
716 :
717 4 : TLOG(TLVL_DEBUG + 42) << GetTraceName() << "sendFragment returning " << CopyStatusToString(sts);
718 2 : return sts;
719 2 : }
720 :
721 0 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendData_(const void* buf, size_t bytes, size_t send_timeout_usec, bool isHeader)
722 : {
723 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_ Converting buf to iovec";
724 0 : iovec iov = {const_cast<void*>(buf), bytes}; // NOLINT(cppcoreguidelines-pro-type-const-cast)
725 0 : return sendData_(&iov, 1, send_timeout_usec, isHeader);
726 : }
727 :
728 4 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendData_(const struct iovec* iov, int iovcnt, size_t send_timeout_usec, bool isHeader)
729 : {
730 : // check all connected??? -- currently just check fd!=-1
731 4 : if (send_fd_ == -1)
732 : {
733 0 : if (timeoutMessageArmed_)
734 : {
735 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Send fd is not open. Returning kTimeout";
736 0 : timeoutMessageArmed_ = false;
737 : }
738 0 : return CopyStatus::kTimeout;
739 : }
740 4 : timeoutMessageArmed_ = true;
741 8 : TLOG(TLVL_DEBUG + 44) << GetTraceName() << "send_timeout_usec is " << send_timeout_usec << ", currently unused.";
742 :
743 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Determining write size" ;
744 4 : uint32_t total_to_write_bytes = 0;
745 8 : std::vector<iovec> iov_in(iovcnt + 1); // need contiguous (for the unlike case that only partial MH
746 4 : std::vector<iovec> iovv(iovcnt + 2); // 1 more for mh and another one for any partial
747 : int ii;
748 8 : for (ii = 0; ii < iovcnt; ++ii)
749 : {
750 4 : iov_in[ii + 1] = iov[ii]; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
751 4 : total_to_write_bytes += iov[ii].iov_len; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
752 : }
753 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Constructing Message Header" ;
754 4 : MessHead mh = {0, isHeader ? MessHead::header_v0 : MessHead::data_v0, htons(source_rank()), {htonl(total_to_write_bytes)}};
755 4 : iov_in[0].iov_base = &mh;
756 4 : iov_in[0].iov_len = sizeof(mh);
757 4 : total_to_write_bytes += sizeof(mh);
758 :
759 4 : ssize_t sts = 0;
760 4 : ssize_t total_written_bytes = 0;
761 4 : ssize_t per_write_max_bytes = (32 * 1024);
762 :
763 4 : size_t in_iov_idx = 0; // only increment this when we know the associated data has been xferred
764 4 : size_t out_iov_idx = 0;
765 4 : ssize_t this_write_bytes = 0;
766 :
767 : do
768 : {
769 : // The first out_iov may be set at the end of the previous loop.
770 : // iov looping from below (b/c of the latter, we need to check this_write_bytes)
771 4 : for (;
772 12 : (in_iov_idx + out_iov_idx) < iov_in.size() && this_write_bytes < per_write_max_bytes;
773 : ++out_iov_idx)
774 : {
775 8 : this_write_bytes += iov_in[in_iov_idx + out_iov_idx].iov_len;
776 8 : iovv[out_iov_idx] = iov_in[in_iov_idx + out_iov_idx];
777 : }
778 4 : if (this_write_bytes > per_write_max_bytes)
779 : {
780 0 : iovv[out_iov_idx - 1].iov_len -= this_write_bytes - per_write_max_bytes;
781 0 : this_write_bytes = per_write_max_bytes;
782 : }
783 :
784 : // need to do blocking algorithm -- including throttled block notifications
785 4 : do_again:
786 : #ifndef __OPTIMIZE__ // This can be an expensive TRACE call (even if disabled) due to multiplicity of calls
787 8 : TLOG(TLVL_DEBUG + 44) << GetTraceName() << "sendFragment b4 writev " << std::setw(7) << total_written_bytes << " total_written_bytes send_fd_=" << send_fd_ << " in_idx=" << in_iov_idx
788 4 : << " iovcnt=" << out_iov_idx << " 1st.len=" << iovv[0].iov_len;
789 : #endif
790 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << " calling writev" ;
791 : #if USE_SENDMSG
792 : msghdr msg;
793 4 : memset(&msg, 0, sizeof(msghdr));
794 4 : msg.msg_iov = &(iovv[0]);
795 4 : msg.msg_iovlen = out_iov_idx; // at this point out_iov_idx is really the count (not an idx per se)
796 4 : sts = sendmsg(send_fd_, &msg, MSG_NOSIGNAL | (blocking != 0u ? 0 : MSG_DONTWAIT));
797 : #else
798 : sts = writev(send_fd_, &(iovv[0]), out_iov_idx); // SIGPIPE may occur -- need signal handler or mask/ignore
799 : #endif
800 : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << " done with writev" ;
801 :
802 4 : if (sts == -1)
803 : {
804 0 : if (errno == EAGAIN /* same as EWOULDBLOCK */)
805 : {
806 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendFragment EWOULDBLOCK";
807 0 : blocking = 1u;
808 :
809 0 : fcntl(send_fd_, F_SETFL, 0); // clear O_NONBLOCK
810 :
811 : // NOTE: YES -- could drop here
812 0 : goto do_again;
813 : }
814 0 : TLOG(TLVL_WARNING) << GetTraceName() << "sendFragment_: WRITE ERROR " << errno << ": " << strerror(errno);
815 0 : connect_state = 0; // any write error closes
816 0 : close(send_fd_);
817 0 : send_fd_ = -1;
818 0 : connection_was_lost_ = true;
819 0 : return TransferInterface::CopyStatus::kErrorNotRequiringException;
820 : }
821 4 : if (sts != this_write_bytes)
822 : {
823 : // we'll loop around -- with
824 0 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendFragment writev sts(" << sts << ")!=requested_send_bytes(" << this_write_bytes << ")";
825 0 : total_written_bytes += sts; // add sts to total_written_bytes now as sts is adjusted next
826 : // find which iovs are done
827 0 : for (ii = 0; static_cast<size_t>(sts) >= iovv[ii].iov_len; ++ii)
828 : {
829 0 : sts -= iovv[ii].iov_len;
830 : }
831 0 : in_iov_idx += ii; // done with these in_iovs
832 0 : iovv[ii].iov_len -= sts; // adjust partial iov
833 0 : iovv[ii].iov_base = static_cast<uint8_t*>(iovv[ii].iov_base) + sts; // adjust partial iov // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
834 :
835 : // add more to get up to per_write_max_bytes
836 0 : out_iov_idx = 0;
837 0 : if (ii != 0)
838 : {
839 0 : iovv[out_iov_idx] = iovv[ii];
840 : }
841 : // starting over
842 0 : this_write_bytes = iovv[out_iov_idx].iov_len;
843 : // add any left over from appropriate in_iov_idx --
844 : // i.e. match this out_iov with the in_iov that was used to
845 : // initialize it; see how close the out base+len is to in base+len
846 : // check !>per_write_max_bytes
847 0 : auto additional = (reinterpret_cast<uintptr_t>(iov_in[in_iov_idx].iov_base) + iov_in[in_iov_idx].iov_len) - (reinterpret_cast<uintptr_t>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
848 0 : if (additional != 0u)
849 : {
850 0 : iovv[out_iov_idx].iov_len += additional;
851 0 : this_write_bytes += additional;
852 0 : if (this_write_bytes > per_write_max_bytes)
853 : {
854 0 : iovv[out_iov_idx].iov_len -= this_write_bytes - per_write_max_bytes;
855 0 : this_write_bytes = per_write_max_bytes;
856 : }
857 : }
858 0 : ++out_iov_idx; // done with
859 0 : TLOG(TLVL_DEBUG + 33) << GetTraceName() << "sendFragment writev sts!=: this_write_bytes=" << this_write_bytes
860 0 : << " out_iov_idx=" << out_iov_idx
861 0 : << " additional=" << additional
862 0 : << " ii=" << ii;
863 : }
864 : else
865 : {
866 : #ifndef __OPTIMIZE__ // This can be an expensive TRACE call (even if disabled) due to multiplicity of calls
867 8 : TLOG(TLVL_DEBUG + 33) << GetTraceName() << "sendFragment writev sts(" << sts << ")==requested_send_bytes(" << this_write_bytes << ")";
868 : #endif
869 4 : total_written_bytes += sts;
870 4 : --out_iov_idx; // make it the index of the last iovv
871 4 : iovv[out_iov_idx].iov_base = static_cast<uint8_t*>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
872 4 : iovv[out_iov_idx].iov_len = 0;
873 4 : in_iov_idx += out_iov_idx; // at least this many complete (one more if "last iovv" is complete
874 4 : this_write_bytes = 0;
875 : // need to check last iovv against appropriate iov_in
876 4 : auto additional = (reinterpret_cast<uintptr_t>(iov_in[in_iov_idx].iov_base) + iov_in[in_iov_idx].iov_len) - (reinterpret_cast<uintptr_t>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
877 4 : if (additional != 0u)
878 : {
879 0 : iovv[out_iov_idx].iov_len += additional;
880 0 : this_write_bytes += additional;
881 0 : if (this_write_bytes > per_write_max_bytes)
882 : {
883 0 : iovv[out_iov_idx].iov_len -= this_write_bytes - per_write_max_bytes;
884 0 : this_write_bytes = per_write_max_bytes;
885 : }
886 0 : if (out_iov_idx != 0)
887 : {
888 0 : iovv[0] = iovv[out_iov_idx];
889 : }
890 0 : out_iov_idx = 1;
891 : }
892 : else
893 : {
894 4 : ++in_iov_idx;
895 4 : out_iov_idx = 0;
896 : }
897 : }
898 4 : } while (total_written_bytes < total_to_write_bytes);
899 4 : if (total_written_bytes > total_to_write_bytes)
900 : {
901 0 : TLOG(TLVL_ERROR) << GetTraceName() << "sendFragment program error: too many bytes transferred";
902 : }
903 :
904 4 : if (blocking != 0u)
905 : {
906 0 : blocking = 0u;
907 0 : fcntl(send_fd_, F_SETFL, O_NONBLOCK); // set O_NONBLOCK
908 : }
909 4 : sts = total_written_bytes - sizeof(MessHead);
910 :
911 8 : TLOG(TLVL_DEBUG + 44) << GetTraceName() << "sendFragment sts=" << sts;
912 4 : return TransferInterface::CopyStatus::kSuccess;
913 4 : }
914 :
915 1 : void artdaq::TCPSocketTransfer::connect_()
916 : {
917 1 : auto start_time = std::chrono::steady_clock::now();
918 :
919 : // Retry a few times if we can't connect
920 2 : while (send_fd_ == -1 && TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_retry_timeout_us_ * 10)
921 : {
922 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Connecting sender socket";
923 1 : int sndbuf_bytes = static_cast<int>(sndbuf_);
924 1 : if (sndbuf_ > INT_MAX)
925 : {
926 0 : sndbuf_bytes = INT_MAX;
927 0 : TLOG(TLVL_WARNING) << "Requested SNDBUF " << sndbuf_ << " too large, setting to INT_MAX: " << INT_MAX;
928 : }
929 2 : TLOG(TLVL_DEBUG + 32) << "Requested SNDBUF is " << sndbuf_bytes;
930 :
931 1 : send_fd_ = TCPConnect(hostMap_[destination_rank()].c_str(), port_, O_NONBLOCK, sndbuf_bytes);
932 1 : if (send_fd_ == -1)
933 : {
934 0 : if (connection_was_lost_) { break; }
935 :
936 0 : usleep(send_retry_timeout_us_);
937 : }
938 : }
939 1 : connect_state = 0;
940 1 : blocking = 0;
941 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "connect_ " + hostMap_[destination_rank()] + ":" << port_ << " send_fd_=" << send_fd_;
942 1 : if (send_fd_ != -1)
943 : {
944 : // write connect msg
945 2 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "connect_: Writing connect message";
946 1 : MessHead mh = {0, MessHead::connect_v0, htons(source_rank()), {htonl(CONN_MAGIC)}};
947 1 : ssize_t sts = write(send_fd_, &mh, sizeof(mh));
948 1 : if (sts == -1)
949 : {
950 0 : TLOG(TLVL_ERROR) << GetTraceName() << "connect_: Error writing connect message!";
951 : // a write error here is completely unexpected!
952 0 : connect_state = 0;
953 0 : close(send_fd_);
954 0 : send_fd_ = -1;
955 : }
956 : else
957 : {
958 4 : TLOG(TLVL_INFO) << GetTraceName() << "connect_: Successfully connected";
959 : // consider it all connected/established
960 1 : connect_state = 1;
961 1 : connection_was_lost_ = false;
962 : }
963 :
964 : #if USE_ACKS
965 : if (ack_listen_thread_ && ack_listen_thread_->joinable()) ack_listen_thread_->join();
966 : TLOG(TLVL_INFO) << GetTraceName() << "Starting Ack Listener Thread";
967 :
968 : try
969 : {
970 : ack_listen_thread_ = std::make_unique<boost::thread>(&TCPSocketTransfer::receive_acks_, this);
971 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
972 : snprintf(tname, sizeof(tname) - 1, "%d-AckListen", my_rank); // NOLINT
973 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
974 : auto handle = ack_listen_thread_.native_handle();
975 : pthread_setname_np(handle, tname);
976 : }
977 : catch (const boost::exception& e)
978 : {
979 : TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Ack Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
980 : std::cerr << "Caught boost::exception starting TCP Socket Ack Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
981 : exit(5);
982 : }
983 : #endif
984 : }
985 1 : }
986 :
987 2 : void artdaq::TCPSocketTransfer::reconnect_()
988 : {
989 2 : if (send_fd_ == -1 && role() == TransferInterface::Role::kSend)
990 : {
991 0 : TLOG(TLVL_DEBUG + 33) << GetTraceName() << "check/reconnect";
992 0 : return connect_();
993 : }
994 : }
995 :
996 2 : void artdaq::TCPSocketTransfer::start_listen_thread_()
997 : {
998 2 : std::lock_guard<std::mutex> start_lock(listen_thread_mutex_);
999 2 : if (listen_thread_refcount_ == 0)
1000 : {
1001 2 : if (listen_thread_ && listen_thread_->joinable())
1002 : {
1003 0 : listen_thread_->join();
1004 : }
1005 2 : listen_thread_refcount_ = 1;
1006 8 : TLOG(TLVL_INFO) << GetTraceName() << "Starting Listener Thread";
1007 :
1008 : try
1009 : {
1010 2 : listen_thread_ = std::make_unique<boost::thread>(&TCPSocketTransfer::listen_, port_, rcvbuf_);
1011 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
1012 2 : snprintf(tname, sizeof(tname) - 1, "%d-Listen", my_rank); // NOLINT
1013 2 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
1014 2 : auto handle = listen_thread_->native_handle();
1015 2 : pthread_setname_np(handle, tname);
1016 : }
1017 0 : catch (const boost::exception& e)
1018 : {
1019 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
1020 0 : std::cerr << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
1021 0 : exit(5);
1022 0 : }
1023 : }
1024 : else
1025 : {
1026 0 : listen_thread_refcount_++;
1027 : }
1028 2 : }
1029 :
1030 : #if USE_ACKS
1031 : void artdaq::TCPSocketTransfer::receive_acks_()
1032 : {
1033 : while (send_fd_ >= 0)
1034 : {
1035 : pollfd pollfd_s;
1036 : pollfd_s.events = POLLIN | POLLPRI;
1037 : pollfd_s.fd = send_fd_;
1038 :
1039 : TLOG(TLVL_DEBUG + 48) << GetTraceName() << "receive_acks_: Polling fd to see if there's data";
1040 : int num_fds_ready = poll(&pollfd_s, 1, 1000);
1041 : if (num_fds_ready <= 0)
1042 : {
1043 : if (num_fds_ready == 0)
1044 : {
1045 : TLOG(TLVL_DEBUG + 48) << GetTraceName() << "receive_acks_: No data on receive socket";
1046 : continue;
1047 : }
1048 :
1049 : TLOG(TLVL_ERROR) << "Error in poll: errno=" << errno;
1050 : break;
1051 : }
1052 :
1053 : if (pollfd_s.revents & (POLLIN | POLLPRI))
1054 : {
1055 : // Expected, don't have to check revents any further
1056 : }
1057 : else
1058 : {
1059 : TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receive_acks_: Wrong event received from pollfd: " << pollfd_s.revents;
1060 : break;
1061 : }
1062 :
1063 : MessHead mh;
1064 : auto sts = read(send_fd_, &mh, sizeof(mh));
1065 :
1066 : if (sts != sizeof(mh))
1067 : {
1068 : TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Wrong message header length received! (actual " << sts << " != " << sizeof(mh) << " expected)";
1069 : continue;
1070 : }
1071 :
1072 : // check for "magic" and valid source_id(aka rank)
1073 : mh.source_id = ntohs(mh.source_id); // convert here as it is reference several times
1074 : if (mh.source_id != my_rank)
1075 : {
1076 : TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Received ack for different sender! Rank=" << my_rank << ", hdr=" << mh.source_id;
1077 : continue;
1078 : }
1079 : if (ntohl(mh.conn_magic) != ACK_MAGIC || !(mh.message_type == MessHead::ack_v0)) // Allow for future connect message versions
1080 : {
1081 : TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Wrong magic bytes in header!";
1082 : continue;
1083 : }
1084 :
1085 : TLOG(TLVL_DEBUG + 47) << GetTraceName() << "receive_acks_: Received ack message, diff is now " << (send_ack_diff_.load() - 1);
1086 : send_ack_diff_--;
1087 : }
1088 : }
1089 :
1090 : void artdaq::TCPSocketTransfer::send_ack_(int fd)
1091 : {
1092 : MessHead mh = {0, MessHead::ack_v0, htons(source_rank()), {htonl(ACK_MAGIC)}};
1093 : write(fd, &mh, sizeof(mh));
1094 : }
1095 : #endif
1096 :
1097 2 : void artdaq::TCPSocketTransfer::listen_(int port, size_t rcvbuf)
1098 : {
1099 2 : int listen_fd = -1;
1100 6 : while (listen_thread_refcount_ > 0)
1101 : {
1102 8 : TLOG(TLVL_DEBUG + 33) << "listen_: Listening/accepting new connections on port " << port;
1103 4 : if (listen_fd == -1)
1104 : {
1105 4 : TLOG(TLVL_DEBUG + 32) << "listen_: Opening listener";
1106 2 : listen_fd = TCP_listen_fd(port, rcvbuf);
1107 : }
1108 4 : if (listen_fd == -1)
1109 : {
1110 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Error creating listen_fd!";
1111 0 : break;
1112 : }
1113 :
1114 : int res;
1115 4 : timeval tv = {2, 0}; // maybe increase of some global "debugging" flag set???
1116 : fd_set rfds;
1117 68 : FD_ZERO(&rfds);
1118 4 : FD_SET(listen_fd, &rfds); // NOLINT
1119 :
1120 4 : res = select(listen_fd + 1, &rfds, static_cast<fd_set*>(nullptr), static_cast<fd_set*>(nullptr), &tv);
1121 4 : if (res > 0)
1122 : {
1123 : int sts;
1124 : sockaddr_un un;
1125 1 : socklen_t arglen = sizeof(un);
1126 : int fd;
1127 2 : TLOG(TLVL_DEBUG + 32) << "listen_: Calling accept";
1128 1 : fd = accept(listen_fd, reinterpret_cast<sockaddr*>(&un), &arglen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
1129 2 : TLOG(TLVL_DEBUG + 32) << "listen_: Done with accept";
1130 :
1131 2 : TLOG(TLVL_DEBUG + 32) << "listen_: Reading connect message";
1132 1 : socklen_t lenlen = sizeof(tv);
1133 : /*sts=*/
1134 1 : setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, lenlen); // see man 7 socket.
1135 : MessHead mh;
1136 1 : uint64_t mark_us = TimeUtils::gettimeofday_us();
1137 1 : sts = read(fd, &mh, sizeof(mh));
1138 1 : uint64_t delta_us = TimeUtils::gettimeofday_us() - mark_us;
1139 2 : TLOG(TLVL_DEBUG + 32) << "listen_: Read of connect message took " << delta_us << " microseconds.";
1140 1 : if (sts != sizeof(mh))
1141 : {
1142 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Wrong message header length received!";
1143 0 : close(fd);
1144 0 : continue;
1145 0 : }
1146 :
1147 : // check for "magic" and valid source_id(aka rank)
1148 1 : mh.source_id = ntohs(mh.source_id); // convert here as it is reference several times
1149 1 : if (ntohl(mh.conn_magic) != CONN_MAGIC || !(mh.message_type == MessHead::connect_v0)) // Allow for future connect message versions
1150 : {
1151 0 : TLOG(TLVL_DEBUG + 32) << "listen_: Wrong magic bytes in header!";
1152 0 : close(fd);
1153 0 : continue;
1154 0 : }
1155 :
1156 : // now add (new) connection
1157 1 : std::lock_guard<std::mutex> lk(fd_mutex_);
1158 1 : connected_fds_[mh.source_id].insert(fd);
1159 :
1160 3 : TLOG(TLVL_INFO) << "listen_: New fd is " << fd << " for source rank " << mh.source_id;
1161 1 : }
1162 : else
1163 : {
1164 6 : TLOG(TLVL_DEBUG + 46) << "listen_: No connections in timeout interval!";
1165 : }
1166 : }
1167 :
1168 6 : TLOG(TLVL_INFO) << "listen_: Shutting down connection listener";
1169 2 : if (listen_fd != -1)
1170 : {
1171 2 : close(listen_fd);
1172 : }
1173 2 : std::lock_guard<std::mutex> lk(fd_mutex_);
1174 2 : auto it = connected_fds_.begin();
1175 2 : while (it != connected_fds_.end())
1176 : {
1177 0 : auto& fd_set = it->second;
1178 0 : auto rank_it = fd_set.begin();
1179 0 : while (rank_it != fd_set.end())
1180 : {
1181 0 : close(*rank_it);
1182 0 : rank_it = fd_set.erase(rank_it);
1183 : }
1184 0 : it = connected_fds_.erase(it);
1185 : }
1186 2 : first_fragment_received_.clear();
1187 :
1188 2 : } // do_connect_
1189 :
1190 28 : size_t artdaq::TCPSocketTransfer::getConnectedFDCount_(int source_rank)
1191 : {
1192 28 : std::lock_guard<std::mutex> lk(fd_mutex_);
1193 : #ifndef __OPTIMIZE__
1194 56 : TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getConnectedFDCount_: count is " << (connected_fds_.count(source_rank) != 0u ? connected_fds_[source_rank].size() : 0);
1195 : #endif
1196 56 : return connected_fds_.count(source_rank) != 0u ? connected_fds_[source_rank].size() : 0;
1197 28 : }
1198 :
1199 29 : int artdaq::TCPSocketTransfer::getActiveFD_(int source_rank)
1200 : {
1201 29 : std::lock_guard<std::mutex> lk(fd_mutex_);
1202 : #ifndef __OPTIMIZE__
1203 58 : TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getActiveFD_: fd is " << (active_receive_fds_.count(source_rank) != 0u ? active_receive_fds_[source_rank] : -1);
1204 : #endif
1205 58 : return active_receive_fds_.count(source_rank) != 0u ? active_receive_fds_[source_rank] : -1;
1206 29 : }
1207 5 : void artdaq::TCPSocketTransfer::setActiveFD_(int source_rank, int fd)
1208 : {
1209 5 : std::lock_guard<std::mutex> lk(fd_mutex_);
1210 : #ifndef __OPTIMIZE__
1211 10 : TLOG(TLVL_DEBUG + 45) << GetTraceName() << "setActiveFD_: setting active fd for rank " << source_rank << " to " << fd;
1212 : #endif
1213 5 : active_receive_fds_[source_rank] = fd;
1214 5 : }
1215 5 : int artdaq::TCPSocketTransfer::getLastActiveFD_(int source_rank)
1216 : {
1217 5 : std::lock_guard<std::mutex> lk(fd_mutex_);
1218 : #ifndef __OPTIMIZE__
1219 10 : TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getLastActiveFD_: fd is " << (last_active_receive_fds_.count(source_rank) != 0u ? last_active_receive_fds_[source_rank] : -1);
1220 : #endif
1221 10 : return last_active_receive_fds_.count(source_rank) != 0u ? last_active_receive_fds_[source_rank] : -1;
1222 5 : }
1223 2 : void artdaq::TCPSocketTransfer::setLastActiveFD_(int source_rank, int fd)
1224 : {
1225 2 : std::lock_guard<std::mutex> lk(fd_mutex_);
1226 : #ifndef __OPTIMIZE__
1227 4 : TLOG(TLVL_DEBUG + 45) << GetTraceName() << "setLastActiveFD_: setting last active fd for rank " << source_rank << " to " << fd;
1228 : #endif
1229 2 : last_active_receive_fds_[source_rank] = fd;
1230 2 : }
|