1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
14 TransceiverSocket::TransceiverSocket(
void)
16 __COUT__ <<
"TransceiverSocket constructor " << __E__;
20 TransceiverSocket::TransceiverSocket(std::string IPAddress,
unsigned int port)
23 __COUT__ <<
"TransceiverSocket constructor " << IPAddress <<
":" << port << __E__;
27 TransceiverSocket::~TransceiverSocket(
void) {}
34 unsigned int interPacketGapUSeconds )
37 std::lock_guard<std::mutex> lock(sendMutex_);
40 __COUTT__ <<
"Acknowledging on Socket Descriptor #: " << socketNumber_
41 <<
" from-port: " << ntohs(socketAddress_.sin_port)
42 <<
" to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
45 const size_t MAX_SEND_SIZE =
46 maxChunkSize > 65500u ?
static_cast<size_t>(65500u) : maxChunkSize;
52 while(offset < buffer.size() && sendToSize > 0)
54 auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
56 : sizeInBytes * (buffer.size() - offset);
59 sendToSize = sendto(socketNumber_,
63 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
65 offset += sendToSize / sizeInBytes;
66 if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0)
67 usleep(interPacketGapUSeconds);
72 __SS__ <<
"Error writing buffer from port "
73 << ntohs(TransmitterSocket::socketAddress_.sin_port) <<
": "
74 << strerror(errno) << std::endl;
87 const std::string& sendBuffer,
88 unsigned int timeoutSeconds ,
89 unsigned int timeoutUSeconds ,
91 unsigned int interPacketTimeoutUSeconds )
93 using clock = std::chrono::steady_clock;
94 auto start = clock::now();
97 std::lock_guard<std::mutex> lock(
98 sendAndReceiveMutex_);
101 send(toSocket, sendBuffer, verbose);
103 __COUTT__ <<
" ----> Time sendAndReceive '" << sendBuffer
104 <<
"' (socketNumber=" << socketNumber_ <<
") check ==> "
105 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
108 <<
" milliseconds. PID=" << getpid()
109 <<
" TID=" << std::this_thread::get_id() << std::endl;
111 std::string receiveBuffer;
112 if(
receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
114 __SS__ <<
"Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
115 <<
" s) or Error receiving response buffer from remote ip:port "
116 << toSocket.getIPAddress() <<
":" << toSocket.getPort()
117 <<
" to this ip:port " << Socket::getIPAddress() <<
":"
118 << Socket::getPort() << __E__;
122 __COUTT__ <<
" ----> Time sendAndReceive '" << sendBuffer <<
"' got "
123 << receiveBuffer.size() <<
" (socketNumber=" << socketNumber_
125 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
128 <<
" milliseconds. PID=" << getpid()
129 <<
" TID=" << std::this_thread::get_id() << std::endl;
132 std::string receiveBuffer2;
135 (timeoutSeconds == 0 && timeoutUSeconds < interPacketTimeoutUSeconds)
137 : interPacketTimeoutUSeconds,
140 receiveBuffer += receiveBuffer2;
142 __COUTT__ <<
" ----> Time sendAndReceive +" << receiveBuffer2.size()
144 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
147 <<
" milliseconds." << std::endl;
150 __COUTT__ <<
" ----> Time sendAndReceive " << receiveBuffer.size() <<
" check ==> "
151 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
154 <<
" milliseconds." << std::endl;
156 return receiveBuffer;
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
int acknowledge(const std::string &buffer, bool verbose=false, size_t maxChunkSize=1500, unsigned int interPacketGapUSeconds=0)
acknowledge() responds to last receive location
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false, unsigned int interPacketTimeoutUSeconds=10000)
defines used also by OtsConfigurationWizardSupervisor