LCOV - code coverage report
Current view: top level - artdaq/TransferPlugins - TCPSocketTransfer.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 67.4 % 601 405
Test Date: 2025-09-04 00:45:34 Functions: 65.3 % 118 77

            Line data    Source code
       1              : // Sep 14, 2016. "TERMS AND CONDITIONS" governing this file are in the README
       2              : // or COPYING file. If you do not have such a file, one can be obtained by
       3              : // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
       4              : // $RCSfile: .emacs.gnu,v $
       5              : // rev="$Revision: 1.30 $$Date: 2016/03/01 14:27:27 $";
       6              : 
       7              : // C Includes
       8              : #include <arpa/inet.h>   // ntohl, ntohs
       9              : #include <poll.h>        // struct pollfd
      10              : #include <sys/socket.h>  // socket, socklen_t
      11              : #include <sys/types.h>   // size_t
      12              : #include <sys/un.h>      // sockaddr_un
      13              : #include <cstdlib>       // atoi, strtoul
      14              : 
      15              : // C++ Includes
      16              : #include <atomic>
      17              : #include <fstream>
      18              : #include <memory>
      19              : #include <mutex>
      20              : #include <stdexcept>
      21              : #include <string>
      22              : 
      23              : // product Includes
      24              : #include "artdaq/DAQdata/Globals.hh"
      25              : #define TRACE_NAME (app_name + "_TCPSocketTransfer").c_str()
      26              : 
      27              : // artdaq Includes
      28              : #include <iomanip>
      29              : #include "artdaq-core/Data/Fragment.hh"
      30              : #include "artdaq-core/Utilities/TimeUtils.hh"
      31              : #include "artdaq/DAQdata/TCPConnect.hh"
      32              : #include "artdaq/DAQdata/TCP_listen_fd.hh"
      33              : #include "artdaq/TransferPlugins/TCPSocketTransfer.hh"
      34              : #include "artdaq/TransferPlugins/detail/SRSockets.hh"
      35              : #include "artdaq/TransferPlugins/detail/Timeout.hh"
      36              : 
      37              : #define USE_SENDMSG 1
      38              : 
      39              : std::atomic<int> artdaq::TCPSocketTransfer::listen_thread_refcount_(0);
      40              : std::unique_ptr<boost::thread> artdaq::TCPSocketTransfer::listen_thread_ = nullptr;
      41              : std::map<int, std::set<int>> artdaq::TCPSocketTransfer::connected_fds_ = std::map<int, std::set<int>>();
      42              : std::map<int, std::set<int>> artdaq::TCPSocketTransfer::first_fragment_received_ = std::map<int, std::set<int>>();
      43              : std::mutex artdaq::TCPSocketTransfer::listen_thread_mutex_;
      44              : std::mutex artdaq::TCPSocketTransfer::fd_mutex_;
      45              : 
      46            3 : artdaq::TCPSocketTransfer::
      47            3 :     TCPSocketTransfer(fhicl::ParameterSet const& pset, TransferInterface::Role role)
      48              :     : TransferInterface(pset, role)
      49            3 :     , send_fd_(-1)
      50            6 :     , port_(pset.get<int>("port", portMan->GetTCPSocketTransferPort(destination_rank())))
      51            6 :     , rcvbuf_(pset.get<size_t>("tcp_receive_buffer_size", 0))
      52            6 :     , sndbuf_(pset.get<size_t>("tcp_send_buffer_size", max_fragment_size_words_ * sizeof(artdaq::RawDataType) * buffer_count_))
      53            6 :     , send_retry_timeout_us_(pset.get<size_t>("send_retry_timeout_us", 1000000))
      54            3 :     , timeoutMessageArmed_(true)
      55            6 :     , receive_disconnected_wait_s_(pset.get<double>("receive_socket_disconnected_wait_s", 10.0))
      56            6 :     , receive_err_wait_us_(pset.get<size_t>("receive_socket_disconnected_wait_us", 10000))
      57            3 :     , receive_socket_has_been_connected_(false)
      58            9 :     , send_ack_diff_(0)
      59              : {
      60            6 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << " Constructor: pset=" << pset.to_string() << ", role=" << (role == TransferInterface::Role::kReceive ? "kReceive" : "kSend");
      61            3 :         connection_was_lost_ = false;
      62              : 
      63            3 :         if (role == TransferInterface::Role::kReceive)
      64              :         {
      65              :                 // Wait for sender to connect...
      66            4 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Listening for connections";
      67            2 :                 start_listen_thread_();
      68            4 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Done Listening";
      69              :         }
      70              :         else
      71              :         {
      72            1 :                 hostMap_ = MakeHostMap(pset);
      73            2 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Connecting to destination";
      74            1 :                 connect_();
      75            2 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Done Connecting";
      76              :         }
      77            6 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "End of Constructor";
      78            3 : }
      79              : 
      80            5 : artdaq::TCPSocketTransfer::~TCPSocketTransfer() noexcept
      81              : {
      82            6 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Shutting down TCPSocketTransfer";
      83              : 
      84            3 :         if (role() == TransferInterface::Role::kSend)
      85              :         {
      86              :                 // close all open connections (send stop_v0) first
      87            1 :                 MessHead mh = {0, MessHead::stop_v0, htons(TransferInterface::source_rank()), {0}};
      88            1 :                 if (send_fd_ != -1)
      89              :                 {
      90              :                         // should be blocking with modest timeo
      91            1 :                         timeval tv = {0, 100000};
      92            1 :                         socklen_t len = sizeof(tv);
      93            1 :                         setsockopt(send_fd_, SOL_SOCKET, SO_SNDTIMEO, &tv, len);
      94            1 :                         write(send_fd_, &mh, sizeof(mh));
      95              :                 }
      96            1 :                 close(send_fd_);
      97            1 :                 send_fd_ = -1;
      98              :         }
      99              :         else
     100              :         {
     101            2 :                 TCPSocketTransfer::flush_buffers();
     102              :                 try
     103              :                 {
     104            2 :                         if (ack_listen_thread_ && ack_listen_thread_->joinable())
     105              :                         {
     106            0 :                                 ack_listen_thread_->join();
     107              :                         }
     108              :                 }
     109            0 :                 catch (...)
     110              :                 {
     111              :                         // IGNORED
     112            0 :                 }
     113              : 
     114            2 :                 std::lock_guard<std::mutex> lk(listen_thread_mutex_);
     115            2 :                 listen_thread_refcount_--;
     116              :                 try
     117              :                 {
     118            2 :                         if (listen_thread_refcount_ <= 0 && listen_thread_ && listen_thread_->joinable())
     119              :                         {
     120            2 :                                 listen_thread_->join();
     121              :                         }
     122              :                 }
     123            0 :                 catch (...)
     124              :                 {
     125              :                         // IGNORED
     126            0 :                 }
     127            2 :         }
     128              : 
     129            6 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "End of Destructor";
     130            5 : }
     131              : 
     132           12 : int artdaq::TCPSocketTransfer::receiveFragmentHeader(detail::RawFragmentHeader& header, size_t timeout_usec)
     133              : {
     134           24 :         TLOG(TLVL_DEBUG + 34) << GetTraceName() << "receiveFragmentHeader BEGIN";
     135           12 :         int ret_rank = RECV_TIMEOUT;
     136              : 
     137              :         // Don't bomb out until received at least one connection...
     138           12 :         if (getConnectedFDCount_(source_rank()) == 0)
     139              :         {  // what if just listen_fd???
     140              :                 //      if (receive_socket_has_been_connected_ && TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
     141              :                 //      {
     142              :                 //                      TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentHeader: senders have been disconnected for "
     143              :                 //                              << TimeUtils::GetElapsedTime(last_recv_time_) << " s (receive_socket_disconnected_wait_s = " << receive_disconnected_wait_s_ << " s). RETURNING DATA_END!";
     144              :                 //                      return DATA_END;
     145              :                 //              }
     146              :                 // if (++not_connected_count_ > receive_err_threshold_) { return DATA_END; }
     147            0 :                 TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Receive socket not connected, returning RECV_TIMEOUT";
     148            0 :                 usleep(receive_err_wait_us_);
     149            0 :                 return RECV_TIMEOUT;
     150              :         }
     151           12 :         receive_socket_has_been_connected_ = true;
     152           12 :         last_recv_time_ = std::chrono::steady_clock::now();
     153              : 
     154           24 :         TLOG(TLVL_DEBUG + 34) << GetTraceName() << "timeout_usec=" << timeout_usec;
     155              :         // void* buff=alloca(max_fragment_size_words_*8);
     156           12 :         size_t byte_cnt = 0;
     157              :         int sts;
     158           12 :         int offset = 0;
     159           12 :         SocketState state = SocketState::Metadata;
     160           12 :         int target_bytes = sizeof(MessHead);
     161           12 :         uint64_t start_time_us = TimeUtils::gettimeofday_us();
     162              : 
     163              :         // while (active_receive_fd_ != -1)
     164              :         //{
     165              :         //      TLOG(TLVL_DEBUG + 33) << GetTraceName() << "Currently receiving from fd " << active_receive_fd_ << ", waiting!";
     166              :         //      usleep(1000);
     167              :         // }
     168              : 
     169              :         uint8_t* buff;
     170              : 
     171              :         int timeout_ms;
     172           12 :         if (timeout_usec == 0)
     173              :         {
     174            0 :                 timeout_ms = 0;
     175              :         }
     176              :         else
     177              :         {
     178           12 :                 timeout_ms = (timeout_usec + 999) / 1000;  // want at least 1 ms
     179              :         }
     180              : 
     181           12 :         bool done = false;
     182           12 :         bool noDataWarningSent = false;
     183           12 :         int loop_guard = 0;
     184              : 
     185           16 :         while (!done && getConnectedFDCount_(source_rank()) > 0)
     186              :         {
     187           14 :                 if (getActiveFD_(source_rank()) == -1)
     188              :                 {
     189           12 :                         loop_guard = 0;
     190           12 :                         size_t fd_count = 0;
     191           12 :                         std::vector<pollfd> pollfds;
     192              :                         {
     193           12 :                                 std::lock_guard<std::mutex> lk(fd_mutex_);
     194           12 :                                 fd_count = connected_fds_[source_rank()].size();
     195           12 :                                 bool first = false;
     196           12 :                                 if (first_fragment_received_[source_rank()].size() < connected_fds_[source_rank()].size())
     197              :                                 {
     198            1 :                                         fd_count -= first_fragment_received_[source_rank()].size();
     199            1 :                                         first = true;
     200              :                                 }
     201           12 :                                 pollfds.resize(fd_count);
     202           12 :                                 auto iter = connected_fds_[source_rank()].begin();
     203           24 :                                 for (size_t ii = 0; ii < fd_count; ++ii)
     204              :                                 {
     205           12 :                                         while (first && first_fragment_received_[source_rank()].count(*iter))
     206              :                                         {
     207            0 :                                                 ++iter;
     208              :                                         }
     209           12 :                                         pollfds[ii].events = POLLIN | POLLPRI | POLLERR;
     210           12 :                                         pollfds[ii].fd = *iter;
     211           12 :                                         ++iter;
     212              :                                 }
     213           12 :                         }
     214              :                         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragment: Polling fd to see if there's data" ;
     215           12 :                         int num_fds_ready = poll(&pollfds[0], fd_count, timeout_ms);
     216           12 :                         if (num_fds_ready <= 0)
     217              :                         {
     218           18 :                                 TLOG(TLVL_DEBUG + 34) << GetTraceName() << "No data on receive socket, returning RECV_TIMEOUT";
     219            9 :                                 return RECV_TIMEOUT;
     220              :                         }
     221              : 
     222            3 :                         size_t index = 0;
     223            3 :                         if (getLastActiveFD_(source_rank()) != -1)
     224              :                         {
     225            2 :                                 for (auto& pollfd : pollfds)
     226              :                                 {
     227            2 :                                         index++;
     228            2 :                                         if (pollfd.fd == getLastActiveFD_(source_rank()))
     229              :                                         {
     230            2 :                                                 break;
     231              :                                         }
     232              :                                 }
     233              :                         }
     234              : 
     235            3 :                         int active_index = -1;
     236            3 :                         int16_t anomolous_events = 0;
     237            3 :                         for (size_t ii = index; ii < index + pollfds.size(); ++ii)
     238              :                         {
     239            3 :                                 auto pollfd_index = (ii + index) % pollfds.size();
     240            3 :                                 setActiveFD_(source_rank(), pollfds[pollfd_index].fd);
     241            3 :                                 if ((pollfds[pollfd_index].revents & (POLLIN | POLLPRI)) != 0)
     242              :                                 {
     243            3 :                                         active_index = pollfd_index;
     244            3 :                                         break;
     245              :                                 }
     246            0 :                                 if ((pollfds[pollfd_index].revents & (POLLHUP | POLLERR)) != 0)
     247              :                                 {
     248            0 :                                         disconnect_receive_socket_("Poll returned POLLHUP or POLLERR, indicating problems with the sender.");
     249            0 :                                         continue;
     250              :                                 }
     251            0 :                                 else if ((pollfds[pollfd_index].revents & (POLLNVAL)) != 0)
     252              :                                 {
     253            0 :                                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "FD is closed, most likely because the peer went away. Removing from fd list.";
     254            0 :                                         disconnect_receive_socket_("FD is closed, most likely because the peer went away.");
     255            0 :                                         continue;
     256            0 :                                 }
     257            0 :                                 else if (pollfds[pollfd_index].revents != 0)
     258              :                                 {
     259            0 :                                         anomolous_events |= pollfds[pollfd_index].revents;
     260              :                                 }
     261              :                         }
     262              : 
     263            3 :                         if (active_index == -1)
     264              :                         {
     265            0 :                                 if (anomolous_events != 0)
     266              :                                 {
     267            0 :                                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Wrong event received from a pollfd. Mask: " << static_cast<int>(anomolous_events);
     268              :                                 }
     269            0 :                                 setActiveFD_(source_rank(), -1);
     270            0 :                                 continue;
     271            0 :                         }
     272              : 
     273            3 :                         if (!done && timeout_usec > 0)
     274              :                         {
     275              :                                 // calc next timeout_ms (unless timed out)
     276            3 :                                 size_t delta_us = TimeUtils::gettimeofday_us() - start_time_us;
     277            3 :                                 if (delta_us > timeout_usec)
     278              :                                 {
     279            0 :                                         return RECV_TIMEOUT;
     280              :                                 }
     281            3 :                                 timeout_ms = ((timeout_usec - delta_us) + 999) / 1000;  // want at least 1 ms
     282              :                         }
     283           12 :                 }
     284            5 :                 if (loop_guard > 10) { usleep(1000); }
     285            5 :                 if (++loop_guard > 10010)
     286              :                 {
     287            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "loop guard triggered, returning RECV_TIMEOUT";
     288            0 :                         usleep(receive_err_wait_us_);
     289            0 :                         setActiveFD_(source_rank(), -1);
     290            0 :                         return RECV_TIMEOUT;
     291              :                 }
     292              : 
     293            5 :                 if (state == SocketState::Metadata)
     294              :                 {
     295              :                         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Reading Message Header" ;
     296            3 :                         buff = &(mha[offset]);  // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
     297            3 :                         byte_cnt = sizeof(MessHead) - offset;
     298              :                 }
     299              :                 else
     300              :                 {
     301              :                         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Reading data" ;
     302            2 :                         buff = reinterpret_cast<uint8_t*>(&header) + offset;  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
     303            2 :                         byte_cnt = target_bytes - offset;
     304              :                 }
     305              :                 // if (byte_cnt > sizeof(MessHead))
     306              :                 //      {
     307              :                 //      TLOG(TLVL_ERROR) << "Invalid byte count for read (count=" << byte_cnt
     308              :                 //                       << ",offset=" << offset << ",mh.byte_count=" << mh.byte_count
     309              :                 //                       << "), skipping read and returning RECV_TIMEOUT";
     310              :                 //      return RECV_TIMEOUT;
     311              :                 // }
     312              : 
     313            5 :                 auto fd = getActiveFD_(source_rank());
     314            5 :                 if (byte_cnt > 0)
     315              :                 {
     316           10 :                         TLOG(TLVL_DEBUG + 35) << GetTraceName() << "Reading " << byte_cnt << " bytes from socket " << fd;
     317            5 :                         sts = read(fd, buff, byte_cnt);
     318           10 :                         TLOG(TLVL_DEBUG + 35) << GetTraceName() << "Done with read sts=" << sts;
     319              :                 }
     320            5 :                 if (sts > 0)
     321              :                 {
     322            5 :                         loop_guard = 0;
     323            5 :                         last_recv_time_ = std::chrono::steady_clock::now();
     324              :                 }
     325              : 
     326           10 :                 TLOG(TLVL_DEBUG + 36) << GetTraceName() << "state=" << static_cast<int>(state) << " read=" << sts;
     327            5 :                 if (sts < 0 && errno != EAGAIN)
     328              :                 {
     329            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "Error on receive, closing socket " << fd
     330            0 :                                            << " (errno=" << errno << ": " << strerror(errno) << ")";
     331            0 :                         disconnect_receive_socket_("Error on receive");
     332            0 :                 }
     333            5 :                 else if (sts <= 0 && errno == EAGAIN)
     334              :                 {
     335            0 :                         if (!noDataWarningSent)
     336              :                         {
     337            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "No data received, is the sender still sending?!?";
     338            0 :                                 noDataWarningSent = true;
     339              :                         }
     340            0 :                         if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
     341              :                         {
     342            0 :                                 TLOG(TLVL_ERROR) << GetTraceName() << "No data received within timeout, aborting!";
     343            0 :                                 return RECV_TIMEOUT;
     344              :                         }
     345            0 :                 }
     346              :                 else
     347              :                 {
     348              :                         // see if we're done (with this state)
     349           10 :                         TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Checking for complete sts=" << sts << ", offset=" << offset << ", target_bytes=" << target_bytes;
     350            5 :                         sts = offset += sts;
     351            5 :                         if (sts >= target_bytes)
     352              :                         {
     353           10 :                                 TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Target read bytes reached. Changing state";
     354            5 :                                 offset = 0;
     355            5 :                                 if (state == SocketState::Metadata)
     356              :                                 {
     357            3 :                                         state = SocketState::Data;
     358            3 :                                         mh.byte_count = ntohl(mh.byte_count);
     359            3 :                                         mh.source_id = ntohs(mh.source_id);
     360            3 :                                         target_bytes = mh.byte_count;
     361            6 :                                         TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Expected header size = " << target_bytes << ", sizeof(RawFragmentHeader) = " << sizeof(artdaq::detail::RawFragmentHeader);
     362              :                                         // assert(target_bytes == sizeof(artdaq::detail::RawFragmentHeader) || target_bytes == 0);
     363              : 
     364            3 :                                         if (mh.message_type == MessHead::stop_v0)
     365              :                                         {
     366            2 :                                                 disconnect_receive_socket_("Stop Message received.");
     367              :                                         }
     368            2 :                                         else if (mh.message_type == MessHead::data_v0 || mh.message_type == MessHead::data_more_v0)
     369              :                                         {
     370            0 :                                                 TLOG(TLVL_WARNING) << GetTraceName() << "Message header indicates that Fragment data follows when I was expecting a Fragment header!";
     371            0 :                                                 disconnect_receive_socket_("Desync detected");
     372              :                                         }
     373              : 
     374            3 :                                         if (target_bytes == 0)
     375              :                                         {
     376              :                                                 // Probably a stop_v0, return timeout so we can try again.
     377            1 :                                                 return RECV_TIMEOUT;
     378              :                                         }
     379              :                                 }
     380              :                                 else
     381              :                                 {
     382            2 :                                         ret_rank = source_rank();
     383            4 :                                         TLOG(TLVL_DEBUG + 35) << GetTraceName() << "done sts=" << sts << " src=" << ret_rank;
     384            4 :                                         TLOG(TLVL_DEBUG + 36) << GetTraceName() << "Done receiving fragment header. Moving into output.";
     385              : 
     386            2 :                                         done = true;  // no more polls
     387              :                                                       // break; // no more read of ready fds
     388              : 
     389              :                                         {
     390            2 :                                                 std::lock_guard<std::mutex> lk(fd_mutex_);
     391            2 :                                                 first_fragment_received_[source_rank()].insert(fd);
     392            2 :                                         }
     393              :                                 }
     394              :                         }
     395              :                 }
     396              : 
     397              :         }  // while(!done)...poll
     398              : 
     399            4 :         TLOG(TLVL_DEBUG + 34) << GetTraceName() << "Returning " << ret_rank;
     400            2 :         return ret_rank;
     401              : }
     402              : 
     403            1 : void artdaq::TCPSocketTransfer::disconnect_receive_socket_(const std::string& msg)
     404              : {
     405            1 :         std::lock_guard<std::mutex> lk(fd_mutex_);
     406            1 :         auto fd = active_receive_fds_[source_rank()];
     407            4 :         TLOG(TLVL_WARNING) << GetTraceName() << "disconnect_receive_socket_: " << msg << " Closing socket " << fd << " for rank " << source_rank();
     408            1 :         close(fd);
     409            1 :         if (connected_fds_.count(source_rank()) != 0u)
     410              :         {
     411            1 :                 connected_fds_[source_rank()].erase(fd);
     412              :         }
     413            1 :         if (first_fragment_received_.count(source_rank()) != 0)
     414              :         {
     415            1 :                 first_fragment_received_[source_rank()].erase(fd);
     416              :         }
     417            1 :         active_receive_fds_[source_rank()] = -1;
     418            2 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "disconnect_receive_socket_: There are now " << connected_fds_[source_rank()].size() << " active senders.";
     419            1 : }
     420              : 
     421            2 : int artdaq::TCPSocketTransfer::receiveFragmentData(RawDataType* destination, size_t /*wordCount*/)
     422              : {
     423            4 :         TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: BEGIN";
     424            2 :         int ret_rank = RECV_TIMEOUT;
     425            2 :         if (getActiveFD_(source_rank()) == -1)
     426              :         {  // what if just listen_fd???
     427            0 :                 TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentData: Receive socket not connected, returning RECV_TIMEOUT (Will result in \"Unexpected return code error\")";
     428            0 :                 return RECV_TIMEOUT;
     429              :         }
     430              : 
     431              :         // void* buff=alloca(max_fragment_size_words_*8);
     432              :         uint8_t* buff;
     433            2 :         size_t byte_cnt = 0;
     434              :         int sts;
     435            2 :         size_t offset = 0;
     436            2 :         SocketState state = SocketState::Metadata;
     437            2 :         size_t target_bytes = sizeof(MessHead);
     438              : 
     439              :         pollfd pollfd_s;
     440            2 :         pollfd_s.events = POLLIN | POLLPRI | POLLERR;
     441            2 :         pollfd_s.fd = getActiveFD_(source_rank());
     442              : 
     443            2 :         int loop_guard = 0;
     444            2 :         bool done = false;
     445            2 :         bool noDataWarningSent = false;
     446            2 :         last_recv_time_ = std::chrono::steady_clock::now();
     447            6 :         while (!done)
     448              :         {
     449            8 :                 TLOG(TLVL_DEBUG + 33) << GetTraceName() << "receiveFragmentData: Polling fd to see if there's data";
     450            4 :                 int num_fds_ready = poll(&pollfd_s, 1, 1000);
     451            8 :                 TLOG(TLVL_DEBUG + 33) << GetTraceName() << "receiveFragmentData: Polled fd to see if there's data"
     452            4 :                                       << ", num_fds_ready = " << num_fds_ready;
     453            4 :                 if (num_fds_ready <= 0)
     454              :                 {
     455            0 :                         if (num_fds_ready == 0)
     456              :                         {
     457            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data from " << source_rank() << " in " << TimeUtils::GetElapsedTimeMilliseconds(last_recv_time_) << " ms!"
     458            0 :                                                    << " State = " << (state == SocketState::Metadata ? "Metadata" : "Data") << ", recvd/total=" << offset << "/" << target_bytes << " (delta=" << target_bytes - offset << ")";
     459              : 
     460            0 :                                 if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
     461              :                                 {
     462            0 :                                         TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data received within timeout (" << TimeUtils::GetElapsedTime(last_recv_time_) << " / " << receive_disconnected_wait_s_ << " ), returning RECV_TIMEOUT";
     463            0 :                                         disconnect_receive_socket_("No data on this socket within timeout");
     464            0 :                                         return RECV_TIMEOUT;
     465              :                                 }
     466            0 :                                 continue;
     467            0 :                         }
     468              : 
     469            0 :                         TLOG(TLVL_ERROR) << "Error in poll: errno=" << errno;
     470            0 :                         break;
     471              :                 }
     472              : 
     473            4 :                 last_recv_time_ = std::chrono::steady_clock::now();
     474              : 
     475            4 :                 if ((pollfd_s.revents & (POLLIN | POLLPRI)) != 0)
     476              :                 {
     477              :                         // Expected, don't have to check revents any further
     478              :                 }
     479            0 :                 else if ((pollfd_s.revents & (POLLNVAL)) != 0)
     480              :                 {
     481            0 :                         disconnect_receive_socket_("FD is closed, most likely because the peer went away.");
     482            0 :                         break;
     483              :                 }
     484            0 :                 else if ((pollfd_s.revents & (POLLHUP | POLLERR)) != 0)
     485              :                 {
     486            0 :                         disconnect_receive_socket_("Poll returned POLLHUP or POLLERR, indicating problems with the sender.");
     487            0 :                         break;
     488              :                 }
     489              :                 else
     490              :                 {
     491            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Wrong event received from pollfd: " << pollfd_s.revents;
     492            0 :                         disconnect_receive_socket_("Wrong event received from pollfd.");
     493            0 :                         break;
     494              :                 }
     495              : 
     496            4 :                 if (state == SocketState::Metadata)
     497              :                 {
     498              :                         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Reading Message Header" ;
     499            2 :                         buff = &(mha[offset]);  // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
     500            2 :                         byte_cnt = sizeof(MessHead) - offset;
     501              :                 }
     502              :                 else
     503              :                 {
     504              :                         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Reading data" ;
     505            2 :                         buff = reinterpret_cast<uint8_t*>(destination) + offset;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic,cppcoreguidelines-pro-type-reinterpret-cast)
     506            2 :                         byte_cnt = mh.byte_count - offset;
     507              :                 }
     508              : 
     509            8 :                 TLOG(TLVL_DEBUG + 38) << GetTraceName() << "receiveFragmentData: Reading " << byte_cnt << " bytes from socket into " << static_cast<void*>(buff);
     510            4 :                 sts = read(getActiveFD_(source_rank()), buff, byte_cnt);
     511              :                 // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receiveFragmentData: Done with read" ;
     512              : 
     513            8 :                 TLOG(TLVL_DEBUG + 38) << GetTraceName() << "recvFragment state=" << static_cast<int>(state) << " read=" << sts;
     514              : 
     515            4 :                 if (sts == 0 || (sts < 0 && errno == EAGAIN))
     516              :                 {
     517            0 :                         sts = 0;  // Treat EAGAIN as receiving no data
     518            0 :                         if (loop_guard > 10) { usleep(1000); }
     519            0 :                         if (++loop_guard > 10010)
     520              :                         {
     521            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: loop guard triggered, returning RECV_TIMEOUT";
     522            0 :                                 setActiveFD_(source_rank(), -1);
     523            0 :                                 return RECV_TIMEOUT;
     524              :                         }
     525            0 :                 }
     526            4 :                 else if (sts > 0)
     527              :                 {
     528            4 :                         loop_guard = 0;
     529            4 :                         last_recv_time_ = std::chrono::steady_clock::now();
     530              :                 }
     531              : 
     532            4 :                 if (sts < 0)
     533              :                 {
     534            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Error on receive, closing socket"
     535            0 :                                            << " (errno=" << errno << ": " << strerror(errno) << ")";
     536            0 :                         disconnect_receive_socket_("Error on receive");
     537              :                 }
     538            4 :                 else if (sts == 0)
     539              :                 {
     540            0 :                         if (!noDataWarningSent)
     541              :                         {
     542            0 :                                 TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: No data received, is the sender still sending?!?";
     543            0 :                                 noDataWarningSent = true;
     544              :                         }
     545            0 :                         if (TimeUtils::GetElapsedTime(last_recv_time_) > receive_disconnected_wait_s_)
     546              :                         {
     547            0 :                                 TLOG(TLVL_ERROR) << GetTraceName() << "receiveFragmentData: No data received within timeout, aborting!";
     548            0 :                                 return RECV_TIMEOUT;
     549              :                         }
     550              :                 }
     551              :                 else
     552              :                 {
     553              :                         // see if we're done (with this state)
     554            4 :                         sts = offset += sts;
     555            4 :                         if (static_cast<size_t>(sts) >= target_bytes)
     556              :                         {
     557            8 :                                 TLOG(TLVL_DEBUG + 42) << GetTraceName() << "receiveFragmentData: Target read bytes reached. Changing state";
     558            4 :                                 offset = 0;
     559            4 :                                 if (state == SocketState::Metadata)
     560              :                                 {
     561            2 :                                         state = SocketState::Data;
     562            2 :                                         mh.byte_count = ntohl(mh.byte_count);
     563            2 :                                         mh.source_id = ntohs(mh.source_id);
     564            2 :                                         target_bytes = mh.byte_count;
     565              : 
     566            2 :                                         if (mh.message_type == MessHead::header_v0)
     567              :                                         {
     568            0 :                                                 TLOG(TLVL_WARNING) << GetTraceName() << "receiveFragmentData: Message header indicates that a Fragment header follows when I was expecting Fragment data!";
     569            0 :                                                 disconnect_receive_socket_("Desync detected");
     570              :                                         }
     571              :                                 }
     572              :                                 else
     573              :                                 {
     574            2 :                                         ret_rank = source_rank();
     575            4 :                                         TLOG(TLVL_DEBUG + 41) << GetTraceName() << "receiveFragmentData done sts=" << sts << " src=" << ret_rank;
     576            4 :                                         TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Done receiving fragment. Moving into output.";
     577              : 
     578              : #if USE_ACKS
     579              :                                         send_ack_(active_receive_fd_);
     580              : #endif
     581              : 
     582            2 :                                         done = true;  // no more polls
     583              :                                                       // break; // no more read of ready fds
     584              :                                 }
     585              :                         }
     586              :                 }
     587              : 
     588              :                 // Check if we were asked to do a 0-size receive
     589            4 :                 if (target_bytes == 0 && state == SocketState::Data)
     590              :                 {
     591            0 :                         ret_rank = source_rank();
     592            0 :                         TLOG(TLVL_DEBUG + 41) << GetTraceName() << "receiveFragmentData done sts=" << sts << " src=" << ret_rank;
     593            0 :                         TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Done receiving fragment. Moving into output.";
     594              : 
     595              : #if USE_ACKS
     596              :                         send_ack_(active_receive_fd_);
     597              : #endif
     598              : 
     599            0 :                         done = true;  // no more polls
     600              :                 }
     601              : 
     602              :         }  // while(!done)...poll
     603              : 
     604            2 :         setLastActiveFD_(source_rank(), getActiveFD_(source_rank()));
     605            2 :         setActiveFD_(source_rank(), -1);
     606              : 
     607            4 :         TLOG(TLVL_DEBUG + 39) << GetTraceName() << "receiveFragmentData: Returning rank " << ret_rank;
     608            2 :         return ret_rank;
     609              : }
     610              : 
     611            2 : bool artdaq::TCPSocketTransfer::isRunning()
     612              : {
     613            2 :         switch (role())
     614              :         {
     615            0 :                 case TransferInterface::Role::kSend:
     616            0 :                         return send_fd_ != -1;
     617            2 :                 case TransferInterface::Role::kReceive:
     618            2 :                         auto count = getConnectedFDCount_(source_rank());
     619            4 :                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "isRunning: There are " << count << " fds connected.";
     620            2 :                         return count > 0;
     621              :         }
     622            0 :         return false;
     623              : }
     624              : 
     625            3 : void artdaq::TCPSocketTransfer::flush_buffers()
     626              : {
     627            3 :         std::set<int> fds;
     628            3 :         auto rank = TransferInterface::source_rank();
     629              :         {
     630            3 :                 std::lock_guard<std::mutex> lk(fd_mutex_);
     631            3 :                 if (connected_fds_.count(rank) != 0)
     632              :                 {
     633            1 :                         fds = connected_fds_[rank];
     634            1 :                         connected_fds_.erase(rank);
     635              :                 }
     636            3 :                 if (first_fragment_received_.count(rank))
     637              :                 {
     638            1 :                         first_fragment_received_.erase(rank);
     639              :                 }
     640            3 :         }
     641              : 
     642              :         char discard_buf[0x1000];
     643            3 :         for (auto& fd : fds)
     644              :         {
     645            0 :                 TLOG(TLVL_INFO) << GetTraceName() << "flush_buffers: Checking for data in socket " << fd << " for rank " << rank;
     646            0 :                 size_t bytes_read = 0;
     647            0 :                 while (int sts = static_cast<int>(read(fd, discard_buf, sizeof(discard_buf)) > 0))
     648              :                 {
     649            0 :                         bytes_read += sts;
     650            0 :                 }
     651            0 :                 if (bytes_read > 0)
     652              :                 {
     653            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "flush_buffers: Flushed " << bytes_read << " bytes from socket " << fd << " for rank " << rank;
     654              :                 }
     655            0 :                 TLOG(TLVL_INFO) << GetTraceName() << "flush_buffers: Closing socket " << fd << " for rank " << rank;
     656            0 :                 close(fd);
     657              :         }
     658            3 :         active_receive_fds_[rank] = -1;
     659            3 :         last_active_receive_fds_[rank] = -1;
     660            3 : }
     661              : 
     662              : // Send the given Fragment. Return the rank of the destination to which
     663              : // the Fragment was sent OR -1 if to none.
     664            2 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendFragment_(Fragment&& frag, size_t send_timeout_usec)
     665              : {
     666            4 :         TLOG(TLVL_DEBUG + 42) << GetTraceName() << "sendFragment begin send of fragment with sequenceID=" << frag.sequenceID();
     667            2 :         artdaq::Fragment grab_ownership_frag = std::move(frag);
     668              : 
     669            2 :         reconnect_();
     670            2 :         if (send_fd_ == -1 && connection_was_lost_)
     671              :         {
     672            0 :                 TLOG(TLVL_INFO) << GetTraceName() << "reconnection attempt failed, returning quickly.";
     673            0 :                 return TransferInterface::CopyStatus::kErrorNotRequiringException;
     674              :         }
     675              : 
     676              :         // Send Fragment Header
     677              : 
     678              : #if USE_ACKS
     679              :         // Wait for fragments to be received
     680              :         while (static_cast<size_t>(send_ack_diff_) > buffer_count_) usleep(10000);
     681              : #endif
     682              : 
     683            2 :         iovec iov = {static_cast<void*>(grab_ownership_frag.headerAddress()),
     684            2 :                      detail::RawFragmentHeader::num_words() * sizeof(RawDataType)};
     685              : 
     686            2 :         auto sts = sendData_(&iov, 1, send_retry_timeout_us_, true);
     687            2 :         auto start_time = std::chrono::steady_clock::now();
     688              :         // If it takes more than 10 seconds to write a Fragment header, give up
     689            2 :         while (sts == CopyStatus::kTimeout && (send_timeout_usec == 0 || TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_timeout_usec) && TimeUtils::GetElapsedTimeMicroseconds(start_time) < 10000000)
     690              :         {
     691            0 :                 TLOG(TLVL_DEBUG + 43) << GetTraceName() << "sendFragment: Timeout sending fragment";
     692            0 :                 sts = sendData_(&iov, 1, send_retry_timeout_us_, true);
     693            0 :                 usleep(1000);
     694              :         }
     695            2 :         if (sts != CopyStatus::kSuccess)
     696              :         {
     697            0 :                 return sts;
     698              :         }
     699              : 
     700              :         // Send Fragment Data
     701              : 
     702            2 :         iov = {static_cast<void*>(grab_ownership_frag.headerAddress() + detail::RawFragmentHeader::num_words()),  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     703            2 :                grab_ownership_frag.sizeBytes() - detail::RawFragmentHeader::num_words() * sizeof(RawDataType)};
     704            2 :         sts = sendData_(&iov, 1, send_retry_timeout_us_);
     705            2 :         start_time = std::chrono::steady_clock::now();
     706            2 :         while (sts == CopyStatus::kTimeout && (send_timeout_usec == 0 || TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_timeout_usec) && TimeUtils::GetElapsedTimeMicroseconds(start_time) < 10000000)
     707              :         {
     708            0 :                 TLOG(TLVL_DEBUG + 43) << GetTraceName() << "sendFragment: Timeout sending fragment";
     709            0 :                 sts = sendData_(&iov, 1, send_retry_timeout_us_);
     710            0 :                 usleep(1000);
     711              :         }
     712              : 
     713              : #if USE_ACKS
     714              :         send_ack_diff_++;
     715              : #endif
     716              : 
     717            4 :         TLOG(TLVL_DEBUG + 42) << GetTraceName() << "sendFragment returning " << CopyStatusToString(sts);
     718            2 :         return sts;
     719            2 : }
     720              : 
     721            0 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendData_(const void* buf, size_t bytes, size_t send_timeout_usec, bool isHeader)
     722              : {
     723            0 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_ Converting buf to iovec";
     724            0 :         iovec iov = {const_cast<void*>(buf), bytes};  // NOLINT(cppcoreguidelines-pro-type-const-cast)
     725            0 :         return sendData_(&iov, 1, send_timeout_usec, isHeader);
     726              : }
     727              : 
     728            4 : artdaq::TransferInterface::CopyStatus artdaq::TCPSocketTransfer::sendData_(const struct iovec* iov, int iovcnt, size_t send_timeout_usec, bool isHeader)
     729              : {
     730              :         // check all connected??? -- currently just check fd!=-1
     731            4 :         if (send_fd_ == -1)
     732              :         {
     733            0 :                 if (timeoutMessageArmed_)
     734              :                 {
     735            0 :                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Send fd is not open. Returning kTimeout";
     736            0 :                         timeoutMessageArmed_ = false;
     737              :                 }
     738            0 :                 return CopyStatus::kTimeout;
     739              :         }
     740            4 :         timeoutMessageArmed_ = true;
     741            8 :         TLOG(TLVL_DEBUG + 44) << GetTraceName() << "send_timeout_usec is " << send_timeout_usec << ", currently unused.";
     742              : 
     743              :         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Determining write size" ;
     744            4 :         uint32_t total_to_write_bytes = 0;
     745            8 :         std::vector<iovec> iov_in(iovcnt + 1);  // need contiguous (for the unlike case that only partial MH
     746            4 :         std::vector<iovec> iovv(iovcnt + 2);    // 1 more for mh and another one for any partial
     747              :         int ii;
     748            8 :         for (ii = 0; ii < iovcnt; ++ii)
     749              :         {
     750            4 :                 iov_in[ii + 1] = iov[ii];                 // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     751            4 :                 total_to_write_bytes += iov[ii].iov_len;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     752              :         }
     753              :         // TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendData_: Constructing Message Header" ;
     754            4 :         MessHead mh = {0, isHeader ? MessHead::header_v0 : MessHead::data_v0, htons(source_rank()), {htonl(total_to_write_bytes)}};
     755            4 :         iov_in[0].iov_base = &mh;
     756            4 :         iov_in[0].iov_len = sizeof(mh);
     757            4 :         total_to_write_bytes += sizeof(mh);
     758              : 
     759            4 :         ssize_t sts = 0;
     760            4 :         ssize_t total_written_bytes = 0;
     761            4 :         ssize_t per_write_max_bytes = (32 * 1024);
     762              : 
     763            4 :         size_t in_iov_idx = 0;  // only increment this when we know the associated data has been xferred
     764            4 :         size_t out_iov_idx = 0;
     765            4 :         ssize_t this_write_bytes = 0;
     766              : 
     767              :         do
     768              :         {
     769              :                 // The first out_iov may be set at the end of the previous loop.
     770              :                 // iov looping from below (b/c of the latter, we need to check this_write_bytes)
     771            4 :                 for (;
     772           12 :                      (in_iov_idx + out_iov_idx) < iov_in.size() && this_write_bytes < per_write_max_bytes;
     773              :                      ++out_iov_idx)
     774              :                 {
     775            8 :                         this_write_bytes += iov_in[in_iov_idx + out_iov_idx].iov_len;
     776            8 :                         iovv[out_iov_idx] = iov_in[in_iov_idx + out_iov_idx];
     777              :                 }
     778            4 :                 if (this_write_bytes > per_write_max_bytes)
     779              :                 {
     780            0 :                         iovv[out_iov_idx - 1].iov_len -= this_write_bytes - per_write_max_bytes;
     781            0 :                         this_write_bytes = per_write_max_bytes;
     782              :                 }
     783              : 
     784              :                 // need to do blocking algorithm -- including throttled block notifications
     785            4 :         do_again:
     786              : #ifndef __OPTIMIZE__  // This can be an expensive TRACE call (even if disabled) due to multiplicity of calls
     787            8 :                 TLOG(TLVL_DEBUG + 44) << GetTraceName() << "sendFragment b4 writev " << std::setw(7) << total_written_bytes << " total_written_bytes send_fd_=" << send_fd_ << " in_idx=" << in_iov_idx
     788            4 :                                       << " iovcnt=" << out_iov_idx << " 1st.len=" << iovv[0].iov_len;
     789              : #endif
     790              : // TLOG(TLVL_DEBUG + 32) << GetTraceName() << " calling writev" ;
     791              : #if USE_SENDMSG
     792              :                 msghdr msg;
     793            4 :                 memset(&msg, 0, sizeof(msghdr));
     794            4 :                 msg.msg_iov = &(iovv[0]);
     795            4 :                 msg.msg_iovlen = out_iov_idx;  // at this point out_iov_idx is really the count (not an idx per se)
     796            4 :                 sts = sendmsg(send_fd_, &msg, MSG_NOSIGNAL | (blocking != 0u ? 0 : MSG_DONTWAIT));
     797              : #else
     798              :                 sts = writev(send_fd_, &(iovv[0]), out_iov_idx);  // SIGPIPE may occur -- need signal handler or mask/ignore
     799              : #endif
     800              :                 // TLOG(TLVL_DEBUG + 32) << GetTraceName() << " done with writev" ;
     801              : 
     802            4 :                 if (sts == -1)
     803              :                 {
     804            0 :                         if (errno == EAGAIN /* same as EWOULDBLOCK */)
     805              :                         {
     806            0 :                                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendFragment EWOULDBLOCK";
     807            0 :                                 blocking = 1u;
     808              : 
     809            0 :                                 fcntl(send_fd_, F_SETFL, 0);  // clear O_NONBLOCK
     810              : 
     811              :                                 // NOTE: YES -- could drop here
     812            0 :                                 goto do_again;
     813              :                         }
     814            0 :                         TLOG(TLVL_WARNING) << GetTraceName() << "sendFragment_: WRITE ERROR " << errno << ": " << strerror(errno);
     815            0 :                         connect_state = 0;  // any write error closes
     816            0 :                         close(send_fd_);
     817            0 :                         send_fd_ = -1;
     818            0 :                         connection_was_lost_ = true;
     819            0 :                         return TransferInterface::CopyStatus::kErrorNotRequiringException;
     820              :                 }
     821            4 :                 if (sts != this_write_bytes)
     822              :                 {
     823              :                         // we'll loop around -- with
     824            0 :                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "sendFragment writev sts(" << sts << ")!=requested_send_bytes(" << this_write_bytes << ")";
     825            0 :                         total_written_bytes += sts;  // add sts to total_written_bytes now as sts is adjusted next
     826              :                         // find which iovs are done
     827            0 :                         for (ii = 0; static_cast<size_t>(sts) >= iovv[ii].iov_len; ++ii)
     828              :                         {
     829            0 :                                 sts -= iovv[ii].iov_len;
     830              :                         }
     831            0 :                         in_iov_idx += ii;                                                    // done with these in_iovs
     832            0 :                         iovv[ii].iov_len -= sts;                                             // adjust partial iov
     833            0 :                         iovv[ii].iov_base = static_cast<uint8_t*>(iovv[ii].iov_base) + sts;  // adjust partial iov // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     834              : 
     835              :                         // add more to get up to per_write_max_bytes
     836            0 :                         out_iov_idx = 0;
     837            0 :                         if (ii != 0)
     838              :                         {
     839            0 :                                 iovv[out_iov_idx] = iovv[ii];
     840              :                         }
     841              :                         // starting over
     842            0 :                         this_write_bytes = iovv[out_iov_idx].iov_len;
     843              :                         // add any left over from appropriate in_iov_idx --
     844              :                         // i.e. match this out_iov with the in_iov that was used to
     845              :                         // initialize it; see how close the out base+len is to in base+len
     846              :                         // check !>per_write_max_bytes
     847            0 :                         auto additional = (reinterpret_cast<uintptr_t>(iov_in[in_iov_idx].iov_base) + iov_in[in_iov_idx].iov_len) - (reinterpret_cast<uintptr_t>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     848            0 :                         if (additional != 0u)
     849              :                         {
     850            0 :                                 iovv[out_iov_idx].iov_len += additional;
     851            0 :                                 this_write_bytes += additional;
     852            0 :                                 if (this_write_bytes > per_write_max_bytes)
     853              :                                 {
     854            0 :                                         iovv[out_iov_idx].iov_len -= this_write_bytes - per_write_max_bytes;
     855            0 :                                         this_write_bytes = per_write_max_bytes;
     856              :                                 }
     857              :                         }
     858            0 :                         ++out_iov_idx;  // done with
     859            0 :                         TLOG(TLVL_DEBUG + 33) << GetTraceName() << "sendFragment writev sts!=: this_write_bytes=" << this_write_bytes
     860            0 :                                               << " out_iov_idx=" << out_iov_idx
     861            0 :                                               << " additional=" << additional
     862            0 :                                               << " ii=" << ii;
     863              :                 }
     864              :                 else
     865              :                 {
     866              : #ifndef __OPTIMIZE__  // This can be an expensive TRACE call (even if disabled) due to multiplicity of calls
     867            8 :                         TLOG(TLVL_DEBUG + 33) << GetTraceName() << "sendFragment writev sts(" << sts << ")==requested_send_bytes(" << this_write_bytes << ")";
     868              : #endif
     869            4 :                         total_written_bytes += sts;
     870            4 :                         --out_iov_idx;                                                                                               // make it the index of the last iovv
     871            4 :                         iovv[out_iov_idx].iov_base = static_cast<uint8_t*>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     872            4 :                         iovv[out_iov_idx].iov_len = 0;
     873            4 :                         in_iov_idx += out_iov_idx;  // at least this many complete (one more if "last iovv" is complete
     874            4 :                         this_write_bytes = 0;
     875              :                         // need to check last iovv against appropriate iov_in
     876            4 :                         auto additional = (reinterpret_cast<uintptr_t>(iov_in[in_iov_idx].iov_base) + iov_in[in_iov_idx].iov_len) - (reinterpret_cast<uintptr_t>(iovv[out_iov_idx].iov_base) + iovv[out_iov_idx].iov_len);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     877            4 :                         if (additional != 0u)
     878              :                         {
     879            0 :                                 iovv[out_iov_idx].iov_len += additional;
     880            0 :                                 this_write_bytes += additional;
     881            0 :                                 if (this_write_bytes > per_write_max_bytes)
     882              :                                 {
     883            0 :                                         iovv[out_iov_idx].iov_len -= this_write_bytes - per_write_max_bytes;
     884            0 :                                         this_write_bytes = per_write_max_bytes;
     885              :                                 }
     886            0 :                                 if (out_iov_idx != 0)
     887              :                                 {
     888            0 :                                         iovv[0] = iovv[out_iov_idx];
     889              :                                 }
     890            0 :                                 out_iov_idx = 1;
     891              :                         }
     892              :                         else
     893              :                         {
     894            4 :                                 ++in_iov_idx;
     895            4 :                                 out_iov_idx = 0;
     896              :                         }
     897              :                 }
     898            4 :         } while (total_written_bytes < total_to_write_bytes);
     899            4 :         if (total_written_bytes > total_to_write_bytes)
     900              :         {
     901            0 :                 TLOG(TLVL_ERROR) << GetTraceName() << "sendFragment program error: too many bytes transferred";
     902              :         }
     903              : 
     904            4 :         if (blocking != 0u)
     905              :         {
     906            0 :                 blocking = 0u;
     907            0 :                 fcntl(send_fd_, F_SETFL, O_NONBLOCK);  // set O_NONBLOCK
     908              :         }
     909            4 :         sts = total_written_bytes - sizeof(MessHead);
     910              : 
     911            8 :         TLOG(TLVL_DEBUG + 44) << GetTraceName() << "sendFragment sts=" << sts;
     912            4 :         return TransferInterface::CopyStatus::kSuccess;
     913            4 : }
     914              : 
     915            1 : void artdaq::TCPSocketTransfer::connect_()
     916              : {
     917            1 :         auto start_time = std::chrono::steady_clock::now();
     918              : 
     919              :         // Retry a few times if we can't connect
     920            2 :         while (send_fd_ == -1 && TimeUtils::GetElapsedTimeMicroseconds(start_time) < send_retry_timeout_us_ * 10)
     921              :         {
     922            2 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "Connecting sender socket";
     923            1 :                 int sndbuf_bytes = static_cast<int>(sndbuf_);
     924            1 :                 if (sndbuf_ > INT_MAX)
     925              :                 {
     926            0 :                         sndbuf_bytes = INT_MAX;
     927            0 :                         TLOG(TLVL_WARNING) << "Requested SNDBUF " << sndbuf_ << " too large, setting to INT_MAX: " << INT_MAX;
     928              :                 }
     929            2 :                 TLOG(TLVL_DEBUG + 32) << "Requested SNDBUF is " << sndbuf_bytes;
     930              : 
     931            1 :                 send_fd_ = TCPConnect(hostMap_[destination_rank()].c_str(), port_, O_NONBLOCK, sndbuf_bytes);
     932            1 :                 if (send_fd_ == -1)
     933              :                 {
     934            0 :                         if (connection_was_lost_) { break; }
     935              : 
     936            0 :                         usleep(send_retry_timeout_us_);
     937              :                 }
     938              :         }
     939            1 :         connect_state = 0;
     940            1 :         blocking = 0;
     941            2 :         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "connect_ " + hostMap_[destination_rank()] + ":" << port_ << " send_fd_=" << send_fd_;
     942            1 :         if (send_fd_ != -1)
     943              :         {
     944              :                 // write connect msg
     945            2 :                 TLOG(TLVL_DEBUG + 32) << GetTraceName() << "connect_: Writing connect message";
     946            1 :                 MessHead mh = {0, MessHead::connect_v0, htons(source_rank()), {htonl(CONN_MAGIC)}};
     947            1 :                 ssize_t sts = write(send_fd_, &mh, sizeof(mh));
     948            1 :                 if (sts == -1)
     949              :                 {
     950            0 :                         TLOG(TLVL_ERROR) << GetTraceName() << "connect_: Error writing connect message!";
     951              :                         // a write error here is completely unexpected!
     952            0 :                         connect_state = 0;
     953            0 :                         close(send_fd_);
     954            0 :                         send_fd_ = -1;
     955              :                 }
     956              :                 else
     957              :                 {
     958            4 :                         TLOG(TLVL_INFO) << GetTraceName() << "connect_: Successfully connected";
     959              :                         // consider it all connected/established
     960            1 :                         connect_state = 1;
     961            1 :                         connection_was_lost_ = false;
     962              :                 }
     963              : 
     964              : #if USE_ACKS
     965              :                 if (ack_listen_thread_ && ack_listen_thread_->joinable()) ack_listen_thread_->join();
     966              :                 TLOG(TLVL_INFO) << GetTraceName() << "Starting Ack Listener Thread";
     967              : 
     968              :                 try
     969              :                 {
     970              :                         ack_listen_thread_ = std::make_unique<boost::thread>(&TCPSocketTransfer::receive_acks_, this);
     971              :                         char tname[16];                                               // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
     972              :                         snprintf(tname, sizeof(tname) - 1, "%d-AckListen", my_rank);  // NOLINT
     973              :                         tname[sizeof(tname) - 1] = '\0';                              // assure term. snprintf is not too evil :)
     974              :                         auto handle = ack_listen_thread_.native_handle();
     975              :                         pthread_setname_np(handle, tname);
     976              :                 }
     977              :                 catch (const boost::exception& e)
     978              :                 {
     979              :                         TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Ack Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
     980              :                         std::cerr << "Caught boost::exception starting TCP Socket Ack Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
     981              :                         exit(5);
     982              :                 }
     983              : #endif
     984              :         }
     985            1 : }
     986              : 
     987            2 : void artdaq::TCPSocketTransfer::reconnect_()
     988              : {
     989            2 :         if (send_fd_ == -1 && role() == TransferInterface::Role::kSend)
     990              :         {
     991            0 :                 TLOG(TLVL_DEBUG + 33) << GetTraceName() << "check/reconnect";
     992            0 :                 return connect_();
     993              :         }
     994              : }
     995              : 
     996            2 : void artdaq::TCPSocketTransfer::start_listen_thread_()
     997              : {
     998            2 :         std::lock_guard<std::mutex> start_lock(listen_thread_mutex_);
     999            2 :         if (listen_thread_refcount_ == 0)
    1000              :         {
    1001            2 :                 if (listen_thread_ && listen_thread_->joinable())
    1002              :                 {
    1003            0 :                         listen_thread_->join();
    1004              :                 }
    1005            2 :                 listen_thread_refcount_ = 1;
    1006            8 :                 TLOG(TLVL_INFO) << GetTraceName() << "Starting Listener Thread";
    1007              : 
    1008              :                 try
    1009              :                 {
    1010            2 :                         listen_thread_ = std::make_unique<boost::thread>(&TCPSocketTransfer::listen_, port_, rcvbuf_);
    1011              :                         char tname[16];                                            // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
    1012            2 :                         snprintf(tname, sizeof(tname) - 1, "%d-Listen", my_rank);  // NOLINT
    1013            2 :                         tname[sizeof(tname) - 1] = '\0';                           // assure term. snprintf is not too evil :)
    1014            2 :                         auto handle = listen_thread_->native_handle();
    1015            2 :                         pthread_setname_np(handle, tname);
    1016              :                 }
    1017            0 :                 catch (const boost::exception& e)
    1018              :                 {
    1019            0 :                         TLOG(TLVL_ERROR) << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
    1020            0 :                         std::cerr << "Caught boost::exception starting TCP Socket Listen thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
    1021            0 :                         exit(5);
    1022            0 :                 }
    1023              :         }
    1024              :         else
    1025              :         {
    1026            0 :                 listen_thread_refcount_++;
    1027              :         }
    1028            2 : }
    1029              : 
    1030              : #if USE_ACKS
    1031              : void artdaq::TCPSocketTransfer::receive_acks_()
    1032              : {
    1033              :         while (send_fd_ >= 0)
    1034              :         {
    1035              :                 pollfd pollfd_s;
    1036              :                 pollfd_s.events = POLLIN | POLLPRI;
    1037              :                 pollfd_s.fd = send_fd_;
    1038              : 
    1039              :                 TLOG(TLVL_DEBUG + 48) << GetTraceName() << "receive_acks_: Polling fd to see if there's data";
    1040              :                 int num_fds_ready = poll(&pollfd_s, 1, 1000);
    1041              :                 if (num_fds_ready <= 0)
    1042              :                 {
    1043              :                         if (num_fds_ready == 0)
    1044              :                         {
    1045              :                                 TLOG(TLVL_DEBUG + 48) << GetTraceName() << "receive_acks_: No data on receive socket";
    1046              :                                 continue;
    1047              :                         }
    1048              : 
    1049              :                         TLOG(TLVL_ERROR) << "Error in poll: errno=" << errno;
    1050              :                         break;
    1051              :                 }
    1052              : 
    1053              :                 if (pollfd_s.revents & (POLLIN | POLLPRI))
    1054              :                 {
    1055              :                         // Expected, don't have to check revents any further
    1056              :                 }
    1057              :                 else
    1058              :                 {
    1059              :                         TLOG(TLVL_DEBUG + 32) << GetTraceName() << "receive_acks_: Wrong event received from pollfd: " << pollfd_s.revents;
    1060              :                         break;
    1061              :                 }
    1062              : 
    1063              :                 MessHead mh;
    1064              :                 auto sts = read(send_fd_, &mh, sizeof(mh));
    1065              : 
    1066              :                 if (sts != sizeof(mh))
    1067              :                 {
    1068              :                         TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Wrong message header length received! (actual " << sts << " != " << sizeof(mh) << " expected)";
    1069              :                         continue;
    1070              :                 }
    1071              : 
    1072              :                 // check for "magic" and valid source_id(aka rank)
    1073              :                 mh.source_id = ntohs(mh.source_id);  // convert here as it is reference several times
    1074              :                 if (mh.source_id != my_rank)
    1075              :                 {
    1076              :                         TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Received ack for different sender! Rank=" << my_rank << ", hdr=" << mh.source_id;
    1077              :                         continue;
    1078              :                 }
    1079              :                 if (ntohl(mh.conn_magic) != ACK_MAGIC || !(mh.message_type == MessHead::ack_v0))  // Allow for future connect message versions
    1080              :                 {
    1081              :                         TLOG(TLVL_ERROR) << GetTraceName() << "receive_ack_: Wrong magic bytes in header!";
    1082              :                         continue;
    1083              :                 }
    1084              : 
    1085              :                 TLOG(TLVL_DEBUG + 47) << GetTraceName() << "receive_acks_: Received ack message, diff is now " << (send_ack_diff_.load() - 1);
    1086              :                 send_ack_diff_--;
    1087              :         }
    1088              : }
    1089              : 
    1090              : void artdaq::TCPSocketTransfer::send_ack_(int fd)
    1091              : {
    1092              :         MessHead mh = {0, MessHead::ack_v0, htons(source_rank()), {htonl(ACK_MAGIC)}};
    1093              :         write(fd, &mh, sizeof(mh));
    1094              : }
    1095              : #endif
    1096              : 
    1097            2 : void artdaq::TCPSocketTransfer::listen_(int port, size_t rcvbuf)
    1098              : {
    1099            2 :         int listen_fd = -1;
    1100            6 :         while (listen_thread_refcount_ > 0)
    1101              :         {
    1102            8 :                 TLOG(TLVL_DEBUG + 33) << "listen_: Listening/accepting new connections on port " << port;
    1103            4 :                 if (listen_fd == -1)
    1104              :                 {
    1105            4 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Opening listener";
    1106            2 :                         listen_fd = TCP_listen_fd(port, rcvbuf);
    1107              :                 }
    1108            4 :                 if (listen_fd == -1)
    1109              :                 {
    1110            0 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Error creating listen_fd!";
    1111            0 :                         break;
    1112              :                 }
    1113              : 
    1114              :                 int res;
    1115            4 :                 timeval tv = {2, 0};  // maybe increase of some global "debugging" flag set???
    1116              :                 fd_set rfds;
    1117           68 :                 FD_ZERO(&rfds);
    1118            4 :                 FD_SET(listen_fd, &rfds);  // NOLINT
    1119              : 
    1120            4 :                 res = select(listen_fd + 1, &rfds, static_cast<fd_set*>(nullptr), static_cast<fd_set*>(nullptr), &tv);
    1121            4 :                 if (res > 0)
    1122              :                 {
    1123              :                         int sts;
    1124              :                         sockaddr_un un;
    1125            1 :                         socklen_t arglen = sizeof(un);
    1126              :                         int fd;
    1127            2 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Calling accept";
    1128            1 :                         fd = accept(listen_fd, reinterpret_cast<sockaddr*>(&un), &arglen);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
    1129            2 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Done with accept";
    1130              : 
    1131            2 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Reading connect message";
    1132            1 :                         socklen_t lenlen = sizeof(tv);
    1133              :                         /*sts=*/
    1134            1 :                         setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, lenlen);  // see man 7 socket.
    1135              :                         MessHead mh;
    1136            1 :                         uint64_t mark_us = TimeUtils::gettimeofday_us();
    1137            1 :                         sts = read(fd, &mh, sizeof(mh));
    1138            1 :                         uint64_t delta_us = TimeUtils::gettimeofday_us() - mark_us;
    1139            2 :                         TLOG(TLVL_DEBUG + 32) << "listen_: Read of connect message took " << delta_us << " microseconds.";
    1140            1 :                         if (sts != sizeof(mh))
    1141              :                         {
    1142            0 :                                 TLOG(TLVL_DEBUG + 32) << "listen_: Wrong message header length received!";
    1143            0 :                                 close(fd);
    1144            0 :                                 continue;
    1145            0 :                         }
    1146              : 
    1147              :                         // check for "magic" and valid source_id(aka rank)
    1148            1 :                         mh.source_id = ntohs(mh.source_id);                                                    // convert here as it is reference several times
    1149            1 :                         if (ntohl(mh.conn_magic) != CONN_MAGIC || !(mh.message_type == MessHead::connect_v0))  // Allow for future connect message versions
    1150              :                         {
    1151            0 :                                 TLOG(TLVL_DEBUG + 32) << "listen_: Wrong magic bytes in header!";
    1152            0 :                                 close(fd);
    1153            0 :                                 continue;
    1154            0 :                         }
    1155              : 
    1156              :                         // now add (new) connection
    1157            1 :                         std::lock_guard<std::mutex> lk(fd_mutex_);
    1158            1 :                         connected_fds_[mh.source_id].insert(fd);
    1159              : 
    1160            3 :                         TLOG(TLVL_INFO) << "listen_: New fd is " << fd << " for source rank " << mh.source_id;
    1161            1 :                 }
    1162              :                 else
    1163              :                 {
    1164            6 :                         TLOG(TLVL_DEBUG + 46) << "listen_: No connections in timeout interval!";
    1165              :                 }
    1166              :         }
    1167              : 
    1168            6 :         TLOG(TLVL_INFO) << "listen_: Shutting down connection listener";
    1169            2 :         if (listen_fd != -1)
    1170              :         {
    1171            2 :                 close(listen_fd);
    1172              :         }
    1173            2 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1174            2 :         auto it = connected_fds_.begin();
    1175            2 :         while (it != connected_fds_.end())
    1176              :         {
    1177            0 :                 auto& fd_set = it->second;
    1178            0 :                 auto rank_it = fd_set.begin();
    1179            0 :                 while (rank_it != fd_set.end())
    1180              :                 {
    1181            0 :                         close(*rank_it);
    1182            0 :                         rank_it = fd_set.erase(rank_it);
    1183              :                 }
    1184            0 :                 it = connected_fds_.erase(it);
    1185              :         }
    1186            2 :         first_fragment_received_.clear();
    1187              : 
    1188            2 : }  // do_connect_
    1189              : 
    1190           28 : size_t artdaq::TCPSocketTransfer::getConnectedFDCount_(int source_rank)
    1191              : {
    1192           28 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1193              : #ifndef __OPTIMIZE__
    1194           56 :         TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getConnectedFDCount_: count is " << (connected_fds_.count(source_rank) != 0u ? connected_fds_[source_rank].size() : 0);
    1195              : #endif
    1196           56 :         return connected_fds_.count(source_rank) != 0u ? connected_fds_[source_rank].size() : 0;
    1197           28 : }
    1198              : 
    1199           29 : int artdaq::TCPSocketTransfer::getActiveFD_(int source_rank)
    1200              : {
    1201           29 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1202              : #ifndef __OPTIMIZE__
    1203           58 :         TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getActiveFD_: fd is " << (active_receive_fds_.count(source_rank) != 0u ? active_receive_fds_[source_rank] : -1);
    1204              : #endif
    1205           58 :         return active_receive_fds_.count(source_rank) != 0u ? active_receive_fds_[source_rank] : -1;
    1206           29 : }
    1207            5 : void artdaq::TCPSocketTransfer::setActiveFD_(int source_rank, int fd)
    1208              : {
    1209            5 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1210              : #ifndef __OPTIMIZE__
    1211           10 :         TLOG(TLVL_DEBUG + 45) << GetTraceName() << "setActiveFD_: setting active fd for rank " << source_rank << " to " << fd;
    1212              : #endif
    1213            5 :         active_receive_fds_[source_rank] = fd;
    1214            5 : }
    1215            5 : int artdaq::TCPSocketTransfer::getLastActiveFD_(int source_rank)
    1216              : {
    1217            5 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1218              : #ifndef __OPTIMIZE__
    1219           10 :         TLOG(TLVL_DEBUG + 45) << GetTraceName() << "getLastActiveFD_: fd is " << (last_active_receive_fds_.count(source_rank) != 0u ? last_active_receive_fds_[source_rank] : -1);
    1220              : #endif
    1221           10 :         return last_active_receive_fds_.count(source_rank) != 0u ? last_active_receive_fds_[source_rank] : -1;
    1222            5 : }
    1223            2 : void artdaq::TCPSocketTransfer::setLastActiveFD_(int source_rank, int fd)
    1224              : {
    1225            2 :         std::lock_guard<std::mutex> lk(fd_mutex_);
    1226              : #ifndef __OPTIMIZE__
    1227            4 :         TLOG(TLVL_DEBUG + 45) << GetTraceName() << "setLastActiveFD_: setting last active fd for rank " << source_rank << " to " << fd;
    1228              : #endif
    1229            2 :         last_active_receive_fds_[source_rank] = fd;
    1230            2 : }
        

Generated by: LCOV version 2.0-1