Line data Source code
1 : #ifndef TCPSocketTransfer_hh
2 : #define TCPSocketTransfer_hh
3 : // This file (TCPSocketTransfer.hh) was created by Ron Rechenmacher <ron@fnal.gov> on
4 : // Sep 14, 2016. "TERMS AND CONDITIONS" governing this file are in the README
5 : // or COPYING file. If you do not have such a file, one can be obtained by
6 : // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
7 : // $RCSfile: .emacs.gnu,v $
8 : // rev="$Revision: 1.30 $$Date: 2016/03/01 14:27:27 $";
9 :
10 : // artdaq Includes
11 : #include "artdaq/DAQdata/HostMap.hh"
12 : #include "artdaq/TransferPlugins/TransferInterface.hh"
13 : #include "artdaq/TransferPlugins/detail/SRSockets.hh"
14 :
15 : // C Includes
16 : #include <sys/uio.h> // iovec
17 : #include <cstdint> // uint64_t
18 :
19 : // C++ Includes
20 : #include <boost/thread.hpp>
21 :
22 : #include <atomic>
23 : #include <chrono>
24 : #include <condition_variable>
25 : #include <map>
26 : #include <memory>
27 : #include <mutex>
28 : #include <string>
29 : #include <utility> // std::move()
30 :
31 : #ifndef USE_ACKS
32 : #define USE_ACKS 0
33 : #endif
34 :
35 : namespace artdaq {
36 : class TCPSocketTransfer;
37 : }
38 :
39 : /**
40 : * \brief TransferInterface implementation plugin that sends data using TCP sockets
41 : */
42 : class artdaq::TCPSocketTransfer : public TransferInterface
43 : {
44 : public:
45 : /**
46 : * \brief TCPSocketTransfer Constructor
47 : * \param ps ParameterSet used to configure TCPSocketTransfer
48 : * \param role Role of this TCPSocketTransfer instance (kSend or kReceive)
49 : *
50 : * \verbatim
51 : * TCPSocketTransfer accepts the following Parameters:
52 : * "tcp_receive_buffer_size" (Default: 0): The TCP buffer size on the receive socket
53 : * "send_retry_timeout_us" (Default: 1000000): Microseconds between send retries (infinite retries for moveFragment, up to send_timeout_us for copyFragment)
54 : * "host_map" (REQUIRED): List of FHiCL tables containing information about other hosts in the system.
55 : * Each table should contain:
56 : * "rank" (Default: RECV_TIMEOUT): Rank of this host
57 : * "host" (Default: "localhost"): Hostname of this host
58 : * "portOffset" (Default: 5500): To avoid collisions, each destination should specify its own port offset.
59 : * All TCPSocketTransfers sending to that destination will add their own rank to make a unique port number.
60 : * \endverbatim
61 : * TCPSocketTransfer also requires all Parameters for configuring a TransferInterface
62 : */
63 : TCPSocketTransfer(fhicl::ParameterSet const& ps, Role role);
64 :
65 : virtual ~TCPSocketTransfer() noexcept;
66 :
67 : /**
68 : * \brief Receive a Fragment Header from the transport mechanism
69 : * \param[out] header Received Fragment Header
70 : * \param timeout_usec Timeout for receive
71 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
72 : */
73 : int receiveFragmentHeader(detail::RawFragmentHeader& header, size_t timeout_usec) override;
74 :
75 : /**
76 : * \brief Receive the body of a Fragment to the given destination pointer
77 : * \param destination Pointer to memory region where Fragment data should be stored
78 : * \param wordCount Number of RawDataType words to receive
79 : * \return The rank the Fragment was received from (should be source_rank), or RECV_TIMEOUT
80 : */
81 : int receiveFragmentData(RawDataType* destination, size_t wordCount) override;
82 :
83 : /**
84 : * \brief Transfer a Fragment to the destination. May not necessarily be reliable, but will not block longer than send_timeout_usec.
85 : * \param frag Fragment to transfer
86 : * \param timeout_usec Timeout for send, in microseconds
87 : * \return CopyStatus detailing result of transfer
88 : */
89 0 : CopyStatus transfer_fragment_min_blocking_mode(Fragment const& frag, size_t timeout_usec) override { return sendFragment_(Fragment(frag), timeout_usec); }
90 :
91 : /**
92 : * \brief Transfer a Fragment to the destination. This should be reliable, if the underlying transport mechanism supports reliable sending
93 : * \param frag Fragment to transfer
94 : * \return CopyStatus detailing result of copy
95 : */
96 2 : CopyStatus transfer_fragment_reliable_mode(Fragment&& frag) override { return sendFragment_(std::move(frag), 0); }
97 :
98 : /**
99 : * \brief Determine whether the TransferInterface plugin is able to send/receive data
100 : * \return True if the TransferInterface plugin is currently able to send/receive data
101 : */
102 : bool isRunning() override;
103 :
104 : /**
105 : * \brief Flush any in-flight data. This should be used by the receiver after the receive loop has
106 : * ended.
107 : */
108 : void flush_buffers() override;
109 :
110 : private:
111 : static std::atomic<int> listen_thread_refcount_;
112 : static std::mutex listen_thread_mutex_;
113 : static std::unique_ptr<boost::thread> listen_thread_;
114 : static std::map<int, std::set<int>> connected_fds_;
115 : static std::map<int, std::set<int>> first_fragment_received_;
116 : static std::mutex fd_mutex_;
117 : int send_fd_;
118 : std::map<int, int> active_receive_fds_;
119 : std::map<int, int> last_active_receive_fds_;
120 :
121 : union
122 : {
123 : MessHead mh;
124 : uint8_t mha[sizeof(MessHead)];
125 : };
126 :
127 : enum class SocketState
128 : {
129 : Metadata,
130 : Data
131 : };
132 :
133 : int port_;
134 : size_t rcvbuf_;
135 : size_t sndbuf_;
136 : size_t send_retry_timeout_us_;
137 :
138 : hostMap_t hostMap_;
139 :
140 : volatile unsigned connect_state : 1; // 0=not "connected" (initial msg not sent)
141 : unsigned blocking : 1; // compatible with bool (true/false)
142 :
143 : bool connection_was_lost_;
144 :
145 : bool timeoutMessageArmed_; // don't repeatedly print about the send fd not being open...
146 : std::chrono::steady_clock::time_point last_recv_time_; // Time of last successful receive
147 : double receive_disconnected_wait_s_; // How long to wait between messages before returning DATA_END
148 : size_t receive_err_wait_us_; // Amount of time to wait if there are no connected receive sockets
149 : std::atomic<bool> receive_socket_has_been_connected_; // Whether the receiver has ever been connected to a sender
150 : std::atomic<int> send_ack_diff_; // Number of sends - number of acks received. Not allowed to exceed buffer_count.
151 : std::unique_ptr<boost::thread> ack_listen_thread_; // Thread to listen for ack messages on the sender
152 :
153 : private: // methods
154 : TCPSocketTransfer(TCPSocketTransfer const&) = delete;
155 : TCPSocketTransfer(TCPSocketTransfer&&) = delete;
156 : TCPSocketTransfer& operator=(TCPSocketTransfer const&) = delete;
157 : TCPSocketTransfer& operator=(TCPSocketTransfer&&) = delete;
158 :
159 : CopyStatus sendFragment_(Fragment&& frag, size_t timeout_usec);
160 :
161 : CopyStatus sendData_(const void* buf, size_t bytes, size_t send_timeout_usec, bool isHeader = false);
162 :
163 : CopyStatus sendData_(const struct iovec* iov, int iovcnt, size_t send_timeout_usec, bool isHeader = false);
164 :
165 : #if USE_ACKS
166 : void receive_acks_();
167 : void send_ack_(int fd);
168 : #endif
169 :
170 : // Sender is responsible for connecting to receiver
171 : void connect_();
172 :
173 : void reconnect_();
174 :
175 : void disconnect_receive_socket_(const std::string& msg = "");
176 :
177 : // Receiver should listen for connections
178 : void start_listen_thread_();
179 : static void listen_(int port, size_t rcvbuf);
180 :
181 : size_t getConnectedFDCount_(int source_rank);
182 : int getActiveFD_(int source_rank);
183 : void setActiveFD_(int source_rank, int fd);
184 : int getLastActiveFD_(int source_rank);
185 : void setLastActiveFD_(int source_rank, int fd);
186 : };
187 :
188 : #endif // TCPSocketTransfer_hh
|