1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
10 #include <arpa/inet.h>
19 , addressLength_(sizeof(fromAddress_))
23 __COUT__ <<
"ReceiverSocket constructor " << IPAddress <<
":" << port << __E__;
28 ReceiverSocket::ReceiverSocket(
void)
29 : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
31 __COUT__ <<
"ReceiverSocket constructor" << __E__;
35 ReceiverSocket::~ReceiverSocket(
void) {}
38 std::string ReceiverSocket::getLastIncomingIPAddress(
void)
41 for(
int i = 0; i < 4; i++)
43 fromIP += std::to_string((lastIncomingIPAddress_ << (i * 8)) & 0xff);
51 unsigned short ReceiverSocket::getLastIncomingPort(
void)
53 return ntohs(lastIncomingPort_);
58 unsigned int timeoutSeconds,
59 unsigned int timeoutUSeconds,
63 lastIncomingIPAddress_,
75 unsigned long& fromIPAddress,
76 unsigned short& fromPort,
77 unsigned int timeoutSeconds,
78 unsigned int timeoutUSeconds,
81 using clock = std::chrono::steady_clock;
82 auto start = clock::now();
85 std::lock_guard<std::mutex> lock(receiveMutex_);
87 __COUTT__ <<
" ----> Time receive check (socketNumber=" << socketNumber_ <<
") ==> "
88 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
91 <<
" milliseconds. PID=" << getpid()
92 <<
" TID=" << std::this_thread::get_id() << std::endl;
95 timeout_.tv_sec = timeoutSeconds;
96 timeout_.tv_usec = timeoutUSeconds;
98 FD_ZERO(&fileDescriptor_);
99 FD_SET(socketNumber_, &fileDescriptor_);
100 auto rc = select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
102 if(rc < 0 && errno == EINTR)
103 __COUTT__ <<
"select interrupted by signal" << std::endl;
105 __COUTT__ <<
" ----> Time receive (socketNumber=" << socketNumber_ <<
", rc=" << rc
106 <<
", errno=" << errno <<
", timeout=" << timeoutSeconds <<
" "
107 << timeoutUSeconds <<
") check ==> "
108 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
111 <<
" milliseconds. PID=" << getpid()
112 <<
" TID=" << std::this_thread::get_id() << std::endl;
114 if(FD_ISSET(socketNumber_, &fileDescriptor_))
116 __COUTT__ <<
" ----> Time receive check ==> "
117 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
120 <<
" milliseconds." << std::endl;
122 buffer.resize(maxSocketSize_);
126 __COUTT__ <<
" ----> Time receive check ==> "
127 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
130 <<
" milliseconds." << std::endl;
132 if((numberOfBytes_ = recvfrom(socketNumber_,
136 (
struct sockaddr*)&fromAddress_,
137 &addressLength_)) == -1)
139 __COUT__ <<
"At socket with IPAddress: " << getIPAddress()
140 <<
" port: " << getPort() << std::endl;
141 __SS__ <<
"Error reading buffer from\tIP:\t";
142 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
143 fromIPAddress = fromAddress_.sin_addr.s_addr;
144 fromPort = fromAddress_.sin_port;
145 lastIncomingIPAddress_ = fromIPAddress;
146 lastIncomingPort_ = fromPort;
148 for(
int i = 0; i < 4; i++)
150 ss << ((fromIPAddress << (i * 8)) & 0xff);
154 ss <<
"\tPort\t" << ntohs(fromPort) <<
" IP " << fromIP << std::endl;
155 __COUT__ <<
"\n" << ss.str();
160 fromIPAddress = fromAddress_.sin_addr.s_addr;
161 fromPort = fromAddress_.sin_port;
162 lastIncomingIPAddress_ = fromIPAddress;
163 lastIncomingPort_ = fromPort;
165 __COUTT__ <<
" ----> Time receive " << numberOfBytes_ <<
" check ==> "
166 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
169 <<
" milliseconds." << std::endl;
171 __COUTS__(2) <<
"IP: " << std::hex << fromIPAddress << std::dec
172 <<
" port: " << fromPort << std::endl
173 <<
"Socket Number: " << socketNumber_
174 <<
" number of bytes received: " << numberOfBytes_ << std::endl;
178 buffer.resize(numberOfBytes_);
183 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
185 __COUT__ <<
"Receiving "
186 <<
" at: " << getIPAddress() <<
":" << getPort()
187 <<
" from: " << fromIP <<
":" << ntohs(fromPort)
188 <<
" size: " << buffer.size() << std::endl;
192 std::stringstream ss;
195 for(uint32_t i = begin; i < buffer.size(); i++)
199 else if(i == begin + 10)
201 ss << std::setfill(
'0') << std::setw(2) << std::hex
202 << (((int16_t)buffer[i]) & 0xFF) <<
"-" << std::dec;
205 __COUTS__(2) << ss.str();
214 __COUT__ <<
"No new messages for "
215 << timeoutSeconds + timeoutUSeconds / 1000000. <<
"s (Total "
216 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
217 <<
"s). Read request timed out receiving on "
218 <<
" " << getIPAddress() <<
":" << getPort() << std::endl;
222 __COUTT__ <<
" ----> Time receive check ==> "
223 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
226 <<
" milliseconds." << std::endl;
233 unsigned int timeoutSeconds,
234 unsigned int timeoutUSeconds,
238 lastIncomingIPAddress_,
250 unsigned long& fromIPAddress,
251 unsigned short& fromPort,
252 unsigned int timeoutSeconds,
253 unsigned int timeoutUSeconds,
256 using clock = std::chrono::steady_clock;
257 auto start = clock::now();
260 std::lock_guard<std::mutex> lock(receiveMutex_);
262 __COUTT__ <<
" ----> Time receive (socketNumber=" << socketNumber_ <<
") check ==> "
263 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
266 <<
" milliseconds. PID=" << getpid()
267 <<
" TID=" << std::this_thread::get_id() << std::endl;
270 timeout_.tv_sec = timeoutSeconds;
271 timeout_.tv_usec = timeoutUSeconds;
273 FD_ZERO(&fileDescriptor_);
274 FD_SET(socketNumber_, &fileDescriptor_);
275 select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
277 __COUTT__ <<
" ----> Time receive (socketNumber=" << socketNumber_ <<
") check ==> "
278 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
281 <<
" milliseconds. PID=" << getpid()
282 <<
" TID=" << std::this_thread::get_id() << std::endl;
284 if(FD_ISSET(socketNumber_, &fileDescriptor_))
286 buffer.resize(maxSocketSize_ /
sizeof(uint32_t));
292 if((numberOfBytes_ = recvfrom(socketNumber_,
296 (
struct sockaddr*)&fromAddress_,
297 &addressLength_)) == -1)
299 __COUT__ <<
"At socket with IPAddress: " << getIPAddress()
300 <<
" port: " << getPort() << std::endl;
301 __SS__ <<
"Error reading buffer from\tIP:\t";
302 std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
303 fromIPAddress = fromAddress_.sin_addr.s_addr;
304 fromPort = fromAddress_.sin_port;
305 lastIncomingIPAddress_ = fromIPAddress;
306 lastIncomingPort_ = fromPort;
308 for(
int i = 0; i < 4; i++)
310 ss << ((fromIPAddress << (i * 8)) & 0xff);
314 ss <<
"\tPort\t" << ntohs(fromPort) <<
" IP " << fromIP << std::endl;
315 __COUT__ <<
"\n" << ss.str();
320 fromIPAddress = fromAddress_.sin_addr.s_addr;
321 fromPort = fromAddress_.sin_port;
322 lastIncomingIPAddress_ = fromIPAddress;
323 lastIncomingPort_ = fromPort;
325 __COUTS__(2) << __PRETTY_FUNCTION__ <<
"IP: " << std::hex << fromIPAddress
326 << std::dec <<
" port: " << fromPort << std::endl
327 <<
"Socket Number: " << socketNumber_
328 <<
" number of bytes: " << numberOfBytes_ << std::endl;
332 buffer.resize(numberOfBytes_ /
sizeof(uint32_t));
338 struct sockaddr_in sin;
339 socklen_t len =
sizeof(sin);
340 getsockname(socketNumber_, (
struct sockaddr*)&sin, &len);
343 __COUT__ <<
"No new messages for "
344 << timeoutSeconds + timeoutUSeconds / 1000000. <<
"s (Total "
345 << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
346 <<
"s). Read request timed out for port: " << ntohs(sin.sin_port)
350 __COUT__ <<
"This a successful read" << std::endl;
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
ReceiverSocket(void)
protected constructor
defines used also by OtsConfigurationWizardSupervisor