otsdaq  3.04.02
ReceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/ReceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 #include "otsdaq/NetworkUtilities/NetworkConverters.h"
5 
6 #include <iomanip> /* for setfill */
7 #include <iostream>
8 #include <sstream>
9 
10 #include <arpa/inet.h>
11 #include <sys/time.h>
12 #include <thread> // std::this_thread
13 
14 using namespace ots;
15 
16 //==============================================================================
17 ReceiverSocket::ReceiverSocket(std::string IPAddress, unsigned int port)
18  : Socket(IPAddress, port)
19  , addressLength_(sizeof(fromAddress_))
20  , numberOfBytes_(0)
21  , readCounter_(0)
22 {
23  __COUT__ << "ReceiverSocket constructor " << IPAddress << ":" << port << __E__;
24 }
25 
26 //==============================================================================
28 ReceiverSocket::ReceiverSocket(void)
29  : addressLength_(sizeof(fromAddress_)), numberOfBytes_(0), readCounter_(0)
30 {
31  __COUT__ << "ReceiverSocket constructor" << __E__;
32 }
33 
34 //==============================================================================
35 ReceiverSocket::~ReceiverSocket(void) {}
36 
37 //==============================================================================
38 std::string ReceiverSocket::getLastIncomingIPAddress(void)
39 {
40  std::string fromIP;
41  for(int i = 0; i < 4; i++)
42  {
43  fromIP += std::to_string((lastIncomingIPAddress_ << (i * 8)) & 0xff);
44  if(i < 3)
45  fromIP += ".";
46  }
47 
48  return fromIP;
49 } //end getLastIncomingIPAddress()
50 //==============================================================================
51 unsigned short ReceiverSocket::getLastIncomingPort(void)
52 {
53  return ntohs(lastIncomingPort_);
54 }
55 
56 //==============================================================================
57 int ReceiverSocket::receive(std::string& buffer,
58  unsigned int timeoutSeconds,
59  unsigned int timeoutUSeconds,
60  bool verbose)
61 {
62  return receive(buffer,
63  lastIncomingIPAddress_,
64  lastIncomingPort_,
65  timeoutSeconds,
66  timeoutUSeconds,
67  verbose);
68 } //end receive()
69 
70 //==============================================================================
74 int ReceiverSocket::receive(std::string& buffer,
75  unsigned long& fromIPAddress,
76  unsigned short& fromPort,
77  unsigned int timeoutSeconds,
78  unsigned int timeoutUSeconds,
79  bool verbose)
80 {
81  using clock = std::chrono::steady_clock;
82  auto start = clock::now();
83 
84  // lockout other receivers for the remainder of the scope
85  std::lock_guard<std::mutex> lock(receiveMutex_);
86 
87  __COUTT__ << " ----> Time receive check (socketNumber=" << socketNumber_ << ") ==> "
88  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
89  start)
90  .count()
91  << " milliseconds. PID=" << getpid()
92  << " TID=" << std::this_thread::get_id() << std::endl;
93 
94  // set timeout period for select()
95  timeout_.tv_sec = timeoutSeconds;
96  timeout_.tv_usec = timeoutUSeconds;
97 
98  FD_ZERO(&fileDescriptor_);
99  FD_SET(socketNumber_, &fileDescriptor_);
100  auto rc = select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
101 
102  if(rc < 0 && errno == EINTR)
103  __COUTT__ << "select interrupted by signal" << std::endl;
104 
105  __COUTT__ << " ----> Time receive (socketNumber=" << socketNumber_ << ", rc=" << rc
106  << ", errno=" << errno << ", timeout=" << timeoutSeconds << " "
107  << timeoutUSeconds << ") check ==> "
108  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
109  start)
110  .count()
111  << " milliseconds. PID=" << getpid()
112  << " TID=" << std::this_thread::get_id() << std::endl;
113 
114  if(FD_ISSET(socketNumber_, &fileDescriptor_))
115  {
116  __COUTT__ << " ----> Time receive check ==> "
117  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
118  start)
119  .count()
120  << " milliseconds." << std::endl;
121 
122  buffer.resize(maxSocketSize_); // NOTE: this is inexpensive according to
123  // Lorenzo/documentation in C++11 (only increases
124  // size once and doesn't decrease size)
125 
126  __COUTT__ << " ----> Time receive check ==> "
127  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
128  start)
129  .count()
130  << " milliseconds." << std::endl;
131 
132  if((numberOfBytes_ = recvfrom(socketNumber_,
133  &buffer[0],
134  maxSocketSize_,
135  0,
136  (struct sockaddr*)&fromAddress_,
137  &addressLength_)) == -1)
138  {
139  __COUT__ << "At socket with IPAddress: " << getIPAddress()
140  << " port: " << getPort() << std::endl;
141  __SS__ << "Error reading buffer from\tIP:\t";
142  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
143  fromIPAddress = fromAddress_.sin_addr.s_addr;
144  fromPort = fromAddress_.sin_port;
145  lastIncomingIPAddress_ = fromIPAddress;
146  lastIncomingPort_ = fromPort;
147 
148  for(int i = 0; i < 4; i++)
149  {
150  ss << ((fromIPAddress << (i * 8)) & 0xff);
151  if(i < 3)
152  ss << ".";
153  }
154  ss << "\tPort\t" << ntohs(fromPort) << " IP " << fromIP << std::endl;
155  __COUT__ << "\n" << ss.str();
156  return -1;
157  }
158  // char address[INET_ADDRSTRLEN];
159  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
160  fromIPAddress = fromAddress_.sin_addr.s_addr;
161  fromPort = fromAddress_.sin_port;
162  lastIncomingIPAddress_ = fromIPAddress;
163  lastIncomingPort_ = fromPort;
164 
165  __COUTT__ << " ----> Time receive " << numberOfBytes_ << " check ==> "
166  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
167  start)
168  .count()
169  << " milliseconds." << std::endl;
170 
171  __COUTS__(2) << "IP: " << std::hex << fromIPAddress << std::dec
172  << " port: " << fromPort << std::endl
173  << "Socket Number: " << socketNumber_
174  << " number of bytes received: " << numberOfBytes_ << std::endl;
175 
176  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
177  // increases size once and doesn't decrease size)
178  buffer.resize(numberOfBytes_);
179  readCounter_ = 0;
180 
181  if(verbose) // debug
182  {
183  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
184 
185  __COUT__ << "Receiving "
186  << " at: " << getIPAddress() << ":" << getPort()
187  << " from: " << fromIP << ":" << ntohs(fromPort)
188  << " size: " << buffer.size() << std::endl;
189 
190  if(TTEST(2))
191  {
192  std::stringstream ss;
193  ss << "\tRx";
194  uint32_t begin = 0;
195  for(uint32_t i = begin; i < buffer.size(); i++)
196  {
197  if(i == begin + 2)
198  ss << ":::";
199  else if(i == begin + 10)
200  ss << ":::";
201  ss << std::setfill('0') << std::setw(2) << std::hex
202  << (((int16_t)buffer[i]) & 0xFF) << "-" << std::dec;
203  }
204  ss << std::endl;
205  __COUTS__(2) << ss.str();
206  }
207  }
208  }
209  else
210  {
211  ++readCounter_;
212 
213  if(verbose)
214  __COUT__ << "No new messages for "
215  << timeoutSeconds + timeoutUSeconds / 1000000. << "s (Total "
216  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
217  << "s). Read request timed out receiving on "
218  << " " << getIPAddress() << ":" << getPort() << std::endl;
219  return -1;
220  }
221 
222  __COUTT__ << " ----> Time receive check ==> "
223  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
224  start)
225  .count()
226  << " milliseconds." << std::endl;
227 
228  return 0;
229 } //end receive()
230 
231 //==============================================================================
232 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
233  unsigned int timeoutSeconds,
234  unsigned int timeoutUSeconds,
235  bool verbose)
236 {
237  return receive(buffer,
238  lastIncomingIPAddress_,
239  lastIncomingPort_,
240  timeoutSeconds,
241  timeoutUSeconds,
242  verbose);
243 } //end receive()
244 
245 //==============================================================================
249 int ReceiverSocket::receive(std::vector<uint32_t>& buffer,
250  unsigned long& fromIPAddress,
251  unsigned short& fromPort,
252  unsigned int timeoutSeconds,
253  unsigned int timeoutUSeconds,
254  bool verbose)
255 {
256  using clock = std::chrono::steady_clock;
257  auto start = clock::now();
258 
259  // lockout other receivers for the remainder of the scope
260  std::lock_guard<std::mutex> lock(receiveMutex_);
261 
262  __COUTT__ << " ----> Time receive (socketNumber=" << socketNumber_ << ") check ==> "
263  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
264  start)
265  .count()
266  << " milliseconds. PID=" << getpid()
267  << " TID=" << std::this_thread::get_id() << std::endl;
268 
269  // set timeout period for select()
270  timeout_.tv_sec = timeoutSeconds;
271  timeout_.tv_usec = timeoutUSeconds;
272 
273  FD_ZERO(&fileDescriptor_);
274  FD_SET(socketNumber_, &fileDescriptor_);
275  select(socketNumber_ + 1, &fileDescriptor_, 0, 0, &timeout_);
276 
277  __COUTT__ << " ----> Time receive (socketNumber=" << socketNumber_ << ") check ==> "
278  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
279  start)
280  .count()
281  << " milliseconds. PID=" << getpid()
282  << " TID=" << std::this_thread::get_id() << std::endl;
283 
284  if(FD_ISSET(socketNumber_, &fileDescriptor_))
285  {
286  buffer.resize(maxSocketSize_ / sizeof(uint32_t)); // NOTE: this is inexpensive
287  // according to
288  // Lorezno/documentation in
289  // C++11 (only increases size
290  // once and doesn't decrease
291  // size)
292  if((numberOfBytes_ = recvfrom(socketNumber_,
293  &buffer[0],
294  maxSocketSize_,
295  0,
296  (struct sockaddr*)&fromAddress_,
297  &addressLength_)) == -1)
298  {
299  __COUT__ << "At socket with IPAddress: " << getIPAddress()
300  << " port: " << getPort() << std::endl;
301  __SS__ << "Error reading buffer from\tIP:\t";
302  std::string fromIP = inet_ntoa(fromAddress_.sin_addr);
303  fromIPAddress = fromAddress_.sin_addr.s_addr;
304  fromPort = fromAddress_.sin_port;
305  lastIncomingIPAddress_ = fromIPAddress;
306  lastIncomingPort_ = fromPort;
307 
308  for(int i = 0; i < 4; i++)
309  {
310  ss << ((fromIPAddress << (i * 8)) & 0xff);
311  if(i < 3)
312  ss << ".";
313  }
314  ss << "\tPort\t" << ntohs(fromPort) << " IP " << fromIP << std::endl;
315  __COUT__ << "\n" << ss.str();
316  return -1;
317  }
318  // char address[INET_ADDRSTRLEN];
319  // inet_ntop(AF_INET, &(fromAddress.sin_addr), address, INET_ADDRSTRLEN);
320  fromIPAddress = fromAddress_.sin_addr.s_addr;
321  fromPort = fromAddress_.sin_port;
322  lastIncomingIPAddress_ = fromIPAddress;
323  lastIncomingPort_ = fromPort;
324 
325  __COUTS__(2) << __PRETTY_FUNCTION__ << "IP: " << std::hex << fromIPAddress
326  << std::dec << " port: " << fromPort << std::endl
327  << "Socket Number: " << socketNumber_
328  << " number of bytes: " << numberOfBytes_ << std::endl;
329 
330  // NOTE: this is inexpensive according to Lorenzo/documentation in C++11 (only
331  // increases size once and doesn't decrease size)
332  buffer.resize(numberOfBytes_ / sizeof(uint32_t));
333  readCounter_ = 0;
334  }
335  else
336  {
337  ++readCounter_;
338  struct sockaddr_in sin;
339  socklen_t len = sizeof(sin);
340  getsockname(socketNumber_, (struct sockaddr*)&sin, &len);
341 
342  if(verbose)
343  __COUT__ << "No new messages for "
344  << timeoutSeconds + timeoutUSeconds / 1000000. << "s (Total "
345  << readCounter_ * (timeoutSeconds + timeoutUSeconds / 1000000.)
346  << "s). Read request timed out for port: " << ntohs(sin.sin_port)
347  << std::endl;
348  return -1;
349  }
350  __COUT__ << "This a successful read" << std::endl;
351  return 0;
352 } //end receive()
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
ReceiverSocket(void)
protected constructor
defines used also by OtsConfigurationWizardSupervisor