LCOV - code coverage report
Current view: top level - artdaq/TransferPlugins - TCPSocketTransfer.hh (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 50.0 % 2 1
Test Date: 2025-09-04 00:45:34 Functions: 50.0 % 2 1

            Line data    Source code
       1              : #ifndef TCPSocketTransfer_hh
       2              : #define TCPSocketTransfer_hh
       3              : // This file (TCPSocketTransfer.hh) was created by Ron Rechenmacher <ron@fnal.gov> on
       4              : // Sep 14, 2016. "TERMS AND CONDITIONS" governing this file are in the README
       5              : // or COPYING file. If you do not have such a file, one can be obtained by
       6              : // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
       7              : // $RCSfile: .emacs.gnu,v $
       8              : // rev="$Revision: 1.30 $$Date: 2016/03/01 14:27:27 $";
       9              : 
      10              : // artdaq Includes
      11              : #include "artdaq/DAQdata/HostMap.hh"
      12              : #include "artdaq/TransferPlugins/TransferInterface.hh"
      13              : #include "artdaq/TransferPlugins/detail/SRSockets.hh"
      14              : 
      15              : // C Includes
      16              : #include <sys/uio.h>  // iovec
      17              : #include <cstdint>    // uint64_t
      18              : 
      19              : // C++ Includes
      20              : #include <boost/thread.hpp>
      21              : 
      22              : #include <atomic>
      23              : #include <chrono>
      24              : #include <condition_variable>
      25              : #include <map>
      26              : #include <memory>
      27              : #include <mutex>
      28              : #include <string>
      29              : #include <utility>  // std::move()
      30              : 
      31              : #ifndef USE_ACKS
      32              : #define USE_ACKS 0
      33              : #endif
      34              : 
      35              : namespace artdaq {
      36              : class TCPSocketTransfer;
      37              : }
      38              : 
      39              : /**
      40              :  * \brief TransferInterface implementation plugin that sends data using TCP sockets
      41              :  */
      42              : class artdaq::TCPSocketTransfer : public TransferInterface
      43              : {
      44              : public:
      45              :         /**
      46              :          * \brief TCPSocketTransfer Constructor
      47              :          * \param ps ParameterSet used to configure TCPSocketTransfer
      48              :          * \param role Role of this TCPSocketTransfer instance (kSend or kReceive)
      49              :          *
      50              :          * \verbatim
      51              :          * TCPSocketTransfer accepts the following Parameters:
      52              :          * "tcp_receive_buffer_size" (Default: 0): The TCP buffer size on the receive socket
      53              :          * "send_retry_timeout_us" (Default: 1000000): Microseconds between send retries (infinite retries for moveFragment, up to send_timeout_us for copyFragment)
      54              :          * "host_map" (REQUIRED): List of FHiCL tables containing information about other hosts in the system.
      55              :          *   Each table should contain:
      56              :          *   "rank" (Default: RECV_TIMEOUT): Rank of this host
      57              :          *   "host" (Default: "localhost"): Hostname of this host
      58              :          *   "portOffset" (Default: 5500): To avoid collisions, each destination should specify its own port offset.
      59              :          *     All TCPSocketTransfers sending to that destination will add their own rank to make a unique port number.
      60              :          * \endverbatim
      61              :          * TCPSocketTransfer also requires all Parameters for configuring a TransferInterface
      62              :          */
      63              :         TCPSocketTransfer(fhicl::ParameterSet const& ps, Role role);
      64              : 
      65              :         virtual ~TCPSocketTransfer() noexcept;
      66              : 
      67              :         /**
      68              :          * \brief Receive a Fragment Header from the transport mechanism
      69              :          * \param[out] header Received Fragment Header
      70              :          * \param timeout_usec Timeout for receive
      71              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      72              :          */
      73              :         int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t timeout_usec) override;
      74              : 
      75              :         /**
      76              :          * \brief Receive the body of a Fragment to the given destination pointer
      77              :          * \param destination Pointer to memory region where Fragment data should be stored
      78              :          * \param wordCount Number of RawDataType words to receive
      79              :          * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
      80              :          */
      81              :         int receiveFragmentData(RawDataType* destination, size_t wordCount) override;
      82              : 
      83              :         /**
      84              :          * \brief Transfer a Fragment to the destination. May not necessarily be reliable, but will not block longer than send_timeout_usec.
      85              :          * \param frag Fragment to transfer
      86              :          * \param timeout_usec Timeout for send, in microseconds
      87              :          * \return CopyStatus detailing result of transfer
      88              :          */
      89            0 :         CopyStatus transfer_fragment_min_blocking_mode(Fragment const& frag, size_t timeout_usec) override { return sendFragment_(Fragment(frag), timeout_usec); }
      90              : 
      91              :         /**
      92              :          * \brief Transfer a Fragment to the destination. This should be reliable, if the underlying transport mechanism supports reliable sending
      93              :          * \param frag Fragment to transfer
      94              :          * \return CopyStatus detailing result of copy
      95              :          */
      96            2 :         CopyStatus transfer_fragment_reliable_mode(Fragment&& frag) override { return sendFragment_(std::move(frag), 0); }
      97              : 
      98              :         /**
      99              :          * \brief Determine whether the TransferInterface plugin is able to send/receive data
     100              :          * \return True if the TransferInterface plugin is currently able to send/receive data
     101              :          */
     102              :         bool isRunning() override;
     103              : 
     104              :         /**
     105              :          * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
     106              :          * ended.
     107              :          */
     108              :         void flush_buffers() override;
     109              : 
     110              : private:
     111              :         static std::atomic<int> listen_thread_refcount_;
     112              :         static std::mutex listen_thread_mutex_;
     113              :         static std::unique_ptr<boost::thread> listen_thread_;
     114              :         static std::map<int, std::set<int>> connected_fds_;
     115              :         static std::map<int, std::set<int>> first_fragment_received_;
     116              :         static std::mutex fd_mutex_;
     117              :         int send_fd_;
     118              :         std::map<int, int> active_receive_fds_;
     119              :         std::map<int, int> last_active_receive_fds_;
     120              : 
     121              :         union
     122              :         {
     123              :                 MessHead mh;
     124              :                 uint8_t mha[sizeof(MessHead)];
     125              :         };
     126              : 
     127              :         enum class SocketState
     128              :         {
     129              :                 Metadata,
     130              :                 Data
     131              :         };
     132              : 
     133              :         int port_;
     134              :         size_t rcvbuf_;
     135              :         size_t sndbuf_;
     136              :         size_t send_retry_timeout_us_;
     137              : 
     138              :         hostMap_t hostMap_;
     139              : 
     140              :         volatile unsigned connect_state : 1;  // 0=not "connected" (initial msg not sent)
     141              :         unsigned blocking : 1;                // compatible with bool (true/false)
     142              : 
     143              :         bool connection_was_lost_;
     144              : 
     145              :         bool timeoutMessageArmed_;                              // don't repeatedly print about the send fd not being open...
     146              :         std::chrono::steady_clock::time_point last_recv_time_;  // Time of last successful receive
     147              :         double receive_disconnected_wait_s_;                    // How long to wait between messages before returning DATA_END
     148              :         size_t receive_err_wait_us_;                            // Amount of time to wait if there are no connected receive sockets
     149              :         std::atomic<bool> receive_socket_has_been_connected_;   // Whether the receiver has ever been connected to a sender
     150              :         std::atomic<int> send_ack_diff_;                        // Number of sends - number of acks received. Not allowed to exceed buffer_count.
     151              :         std::unique_ptr<boost::thread> ack_listen_thread_;      // Thread to listen for ack messages on the sender
     152              : 
     153              : private:  // methods
     154              :         TCPSocketTransfer(TCPSocketTransfer const&) = delete;
     155              :         TCPSocketTransfer(TCPSocketTransfer&&) = delete;
     156              :         TCPSocketTransfer& operator=(TCPSocketTransfer const&) = delete;
     157              :         TCPSocketTransfer& operator=(TCPSocketTransfer&&) = delete;
     158              : 
     159              :         CopyStatus sendFragment_(Fragment&& frag, size_t timeout_usec);
     160              : 
     161              :         CopyStatus sendData_(const void* buf, size_t bytes, size_t send_timeout_usec, bool isHeader = false);
     162              : 
     163              :         CopyStatus sendData_(const struct iovec* iov, int iovcnt, size_t send_timeout_usec, bool isHeader = false);
     164              : 
     165              : #if USE_ACKS
     166              :         void receive_acks_();
     167              :         void send_ack_(int fd);
     168              : #endif
     169              : 
     170              :         // Sender is responsible for connecting to receiver
     171              :         void connect_();
     172              : 
     173              :         void reconnect_();
     174              : 
     175              :         void disconnect_receive_socket_(const std::string& msg = "");
     176              : 
     177              :         // Receiver should listen for connections
     178              :         void start_listen_thread_();
     179              :         static void listen_(int port, size_t rcvbuf);
     180              : 
     181              :         size_t getConnectedFDCount_(int source_rank);
     182              :         int getActiveFD_(int source_rank);
     183              :         void setActiveFD_(int source_rank, int fd);
     184              :         int getLastActiveFD_(int source_rank);
     185              :         void setLastActiveFD_(int source_rank, int fd);
     186              : };
     187              : 
     188              : #endif  // TCPSocketTransfer_hh
        

Generated by: LCOV version 2.0-1