otsdaq  3.07.00
TCPServerBase.cc
1 // #ifndef BEAGLEBONE
2 // #include "otsdaq_cmsburninbox/BeagleBone/BeagleBoneUtils/TCPServerBase.h"
3 // #else
4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
7 
8 // #endif
9 
10 #include <arpa/inet.h>
11 #include <errno.h> // errno
12 #include <string.h> // errno
13 #include <sys/socket.h>
14 #include <iostream>
15 #include <thread>
16 
17 // #include <sys/socket.h>
18 // #include <netinet/in.h>
19 // #include <netdb.h>
20 
21 using namespace ots;
22 
23 //==============================================================================
24 TCPServerBase::TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients)
25  : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
26 {
27  // 0 or -1 means no restrictions on the number of clients
28  if(fMaxNumberOfClients == 0)
29  fMaxNumberOfClients = (unsigned)-1;
30  // CANNOT GO IN THE CONSTRUCTOR OR IT MIGHT START BEFORE THE CHILD CLASS CONSTRUCTOR IS FULLY CONSTRUCTED
31  // THIS MIGHT RESULT IN THE CALL OF THE VIRTUAL TCPServerBase::acceptConnections
32  // startAccept();
33 }
34 
35 //==============================================================================
36 TCPServerBase::~TCPServerBase(void)
37 {
38  __COUT__ << "Shutting down accept for socket: " << getSocketId() << std::endl;
39  shutdownAccept();
40  while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41  100)) != std::future_status::ready)
42  {
43  __COUT__ << "Server accept still running" << std::endl;
44  shutdownAccept();
45  }
46  //__COUT__ << "Closing connected client sockets for socket: " << getSocketId() << std::endl;
47  closeClientSockets();
48  //__COUT__ << "Closed all sockets connected to server: " << getSocketId() << std::endl;
49 }
50 
51 //==============================================================================
52 void TCPServerBase::startAccept(void)
53 {
54  struct sockaddr_in checkAddr;
55  socklen_t addrLen = sizeof(checkAddr);
56  if(::getsockname(getSocketId(), (struct sockaddr*)&checkAddr, &addrLen) == 0 &&
57  checkAddr.sin_port != 0)
58  {
59  __COUT__ << "Socket " << getSocketId() << " is already bound to port "
60  << ntohs(checkAddr.sin_port) << std::endl;
61  return;
62  }
63 
64  // __COUT__ << "Begin startAccept" << std::endl;
65  int opt = 1; // SO_REUSEADDR - man socket(7)
66  if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int)) == -1)
67  {
68  close();
69  __SS__ << "Setsockopt: " << strerror(errno) << __E__;
70  __SS_THROW__;
71  }
72 
73  struct sockaddr_in serverAddr;
74  bzero((char*)&serverAddr, sizeof(serverAddr));
75  serverAddr.sin_family = AF_INET;
76  serverAddr.sin_port = htons(fServerPort);
77  serverAddr.sin_addr.s_addr = INADDR_ANY;
78 
79  if(::bind(getSocketId(), (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0)
80  {
81  close();
82  __SS__ << "Bind to port " << std::to_string(fServerPort) << ": "
83  << strerror(errno) << __E__;
84  __SS_THROW__;
85  }
86  // freeaddrinfo(serverAddr); // all done with this structure
87 
88  if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
89  {
90  close();
91  __SS__ << "Listen on port " << std::to_string(fServerPort) << ": "
92  << strerror(errno) << __E__;
93  __SS_THROW__;
94  }
95 
96  fAccept = true;
97  fAcceptFuture =
98  std::async(std::launch::async, &TCPServerBase::acceptConnections, this);
99  // __COUT__ << "Done startAccept" << std::endl;
100 }
101 
103 //==============================================================================
104 int TCPServerBase::accept(bool blocking)
105 {
106  __COUT__ << "Now server accept connections on socket: " << getSocketId() << std::endl;
107  if(getSocketId() == invalidSocketId)
108  {
109  __SS__ << "Accept called on a bad socket object (this object was moved)" << __E__;
110  __SS_THROW__;
111  }
112 
113  struct sockaddr_storage clientAddress; // connector's address information
114  socklen_t clientAddressSize = sizeof(clientAddress);
115  int clientSocket = invalidSocketId;
116  if(blocking)
117  {
118  //__COUT__ << "Number of connected clients: " << fConnectedClients.size() << std::endl;
119  // clientSocket = ::accept4(getSocketId(),(struct sockaddr *)&clientAddress, &clientAddressSize, 0);
120  // unsigned counter = 0;
121  __COUT__ << "Client list on input:\n";
122  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
123  {
124  __COUT__ << " --> Client: " << it->first << " : " << it->second << std::endl;
125  }
126  while(true)
127  {
128  clientSocket = ::accept(
129  getSocketId(), (struct sockaddr*)&clientAddress, &clientAddressSize);
130  __COUT__ << ": clientSocket returned = " << clientSocket << std::endl;
131 
132  //FIXME: Commenting out this line to avoid seg-fault in the case there are two clients connecting from the same process...
133  // pingActiveClients(); // This message is to check if there are clients that disconnected and, if so, they are removed from the client list
134  if(fAccept && fMaxNumberOfClients > 0 &&
135  fConnectedClients.size() >= fMaxNumberOfClients)
136  {
137  send(clientSocket, "Too many clients connected!", 27, 0);
138  ::shutdown(clientSocket, SHUT_WR);
139  continue;
140  }
141  break;
142  }
143  __COUT__ << "fAccept? " << fAccept << std::endl;
144  if(!fAccept)
145  {
146  throw E_SHUTDOWN;
147  }
148  else if(clientSocket == invalidSocketId)
149  {
150  __COUT__ << "New socket invalid?: " << clientSocket << " errno: " << errno
151  << std::endl;
152  __SS__ << "Accept: " << strerror(errno) << __E__;
153  __SS_THROW__;
154  }
155 
156  __COUT__ << "Server just accepted a connection on socket: " << getSocketId()
157  << " Client socket: " << clientSocket << std::endl;
158  return clientSocket;
159  }
160  else
161  {
162  constexpr int sleepMSeconds = 5;
163  constexpr int timeoutSeconds = 0;
164  constexpr int timeoutUSeconds = 1000;
165  struct timeval timeout;
166  timeout.tv_sec = timeoutSeconds;
167  timeout.tv_usec = timeoutUSeconds;
168 
169  fd_set fdSet;
170 
171  while(fAccept)
172  {
173  FD_ZERO(&fdSet);
174  FD_SET(getSocketId(), &fdSet);
175  select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
176 
177  if(FD_ISSET(getSocketId(), &fdSet))
178  {
179  struct sockaddr_in clientAddress;
180  socklen_t socketSize = sizeof(clientAddress);
181  // int newSocketFD = ::accept4(fdServerSocket_,(struct sockaddr*)&clientAddress,&socketSize, (pushOnly_ ? SOCK_NONBLOCK : 0));
182  clientSocket = ::accept(
183  getSocketId(),
184  (struct sockaddr*)&clientAddress,
185  &socketSize); // Blocking since select goes in timeout if there is nothing
186  if(clientSocket == invalidSocketId)
187  {
188  __COUT__ << "New socket invalid?: " << clientSocket
189  << " errno: " << errno << std::endl;
190  __SS__ << "Accept: " << strerror(errno) << __E__;
191  __SS_THROW__;
192  }
193  return clientSocket;
194  }
195  std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
196  }
197  throw E_SHUTDOWN;
198  }
199 }
200 
201 //==============================================================================
203 void TCPServerBase::closeClientSockets(void)
204 {
205  for(auto& socket : fConnectedClients)
206  {
207  try
208  {
209  socket.second->sendClose();
210  }
211  catch(const std::exception& e)
212  {
213  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
214  // Just do nothing!
215  __COUT__ << e.what() << '\n';
216  }
217 
218  auto clientThread = fConnectedClientsFuture.find(socket.first);
219  if(clientThread != fConnectedClientsFuture.end())
220  clientThread->second.wait(); // Waiting for client thread
221  delete socket.second;
222  }
223  fConnectedClients.clear();
224  fConnectedClientsFuture.clear();
225 }
226 
227 //==============================================================================
228 void TCPServerBase::closeClientSocket(int socket)
229 {
230  // This method is called inside the thread itself so it cannot call the removeClientSocketFuture!!!
231  auto it = fConnectedClients.find(socket);
232  if(it != fConnectedClients.end())
233  {
234  if(it->second->getSocketId() == socket)
235  {
236  try
237  {
238  if(it->second != nullptr)
239  it->second->sendClose();
240  }
241  catch(const std::exception& e)
242  {
243  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
244  // Just do nothing!
245  __COUT__ << e.what() << '\n';
246  }
247  delete it->second;
248  fConnectedClients.erase(it);
249  }
250  else
251  {
252  __SS__
253  << "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"
254  << __E__;
255  __SS_THROW__;
256  }
257  }
258 }
259 
260 //==============================================================================
261 void TCPServerBase::broadcastPacket(const char* message, std::size_t length)
262 {
263  broadcastPacket(std::string(message, length));
264 }
265 
266 //==============================================================================
267 void TCPServerBase::broadcastPacket(const std::string& message)
268 {
269  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
270  {
271  try
272  {
273  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
274  sock != nullptr)
275  sock->sendPacket(message);
276  }
277  catch(const std::exception& e)
278  {
279  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
280  // contact Lorenzo Uplegger" << std::endl;
281  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
282  // __COUT__ << "Error: " << e.what() << std::endl;
283  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
284  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
285  delete it->second;
286  fConnectedClients.erase(it--);
287  }
288  }
289 }
290 
291 //========================================================================================================================
292 void TCPServerBase::broadcast(const char* message, std::size_t length)
293 {
294  // std::lock_guard<std::mutex> lock(clientsMutex_);
295  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
296  {
297  try
298  {
299  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
300  sock != nullptr)
301  sock->send(message, length);
302  }
303  catch(const std::exception& e)
304  {
305  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
306  // contact Lorenzo Uplegger" << std::endl;
307  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
308  // __COUT__ << "Error: " << e.what() << std::endl;
309  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
310  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
311  delete it->second;
312  fConnectedClients.erase(it--);
313  }
314  }
315 }
316 
317 //==============================================================================
318 void TCPServerBase::broadcast(const std::string& message)
319 {
320  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
321  {
322  try
323  {
324  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
325  sock != nullptr)
326  sock->send(message);
327  }
328  catch(const std::exception& e)
329  {
330  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
331  // contact Lorenzo Uplegger" << std::endl;
332  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
333  // __COUT__ << "Error: " << e.what() << std::endl;
334  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
335  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
336  delete it->second;
337  fConnectedClients.erase(it--);
338  }
339  }
340 }
341 
342 //==============================================================================
343 void TCPServerBase::broadcast(const std::vector<char>& message)
344 {
345  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
346  {
347  try
348  {
349  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
350  sock != nullptr)
351  sock->send(message);
352  }
353  catch(const std::exception& e)
354  {
355  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
356  // contact Lorenzo Uplegger" << std::endl;
357  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
358  // __COUT__ << "Error: " << e.what() << std::endl;
359  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
360  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
361  delete it->second;
362  fConnectedClients.erase(it--);
363  }
364  }
365 }
366 
367 //==============================================================================
368 void TCPServerBase::broadcast(const std::vector<uint16_t>& message)
369 {
370  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
371  {
372  try
373  {
374  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
375  sock != nullptr)
376  sock->send(message);
377  }
378  catch(const std::exception& e)
379  {
380  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
381  __COUT__ << "Error: " << e.what() << std::endl;
382  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
383  {
384  __COUT__ << "Removing client entry from future connected clients list\n";
385  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
386  }
387  __COUT__ << "Removing client entry from connected clients list\n";
388  delete it->second;
389  fConnectedClients.erase(it--);
390  }
391  }
392 }
393 
394 //==============================================================================
395 void TCPServerBase::pingActiveClients()
396 {
397  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
398  {
399  __COUT__ << "Pinging client " << it->first << " : " << it->second << std::endl;
400  try
401  {
402  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
403  sock != nullptr)
404  sock->send("", 0, true);
405  }
406  catch(const std::exception& e)
407  {
408  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
409  // contact Lorenzo Uplegger" << std::endl;
410  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
411  __COUT__ << "Error: " << e.what() << std::endl;
412  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
413  {
414  __COUT__ << "Removing client entry from future connected clients list\n";
415  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
416  }
417  __COUT__ << "Removing client entry from connected clients list\n";
418  delete it->second;
419  fConnectedClients.erase(it--);
420  }
421  }
422 }
423 
424 //==============================================================================
425 void TCPServerBase::shutdownAccept()
426 {
427  fAccept = false;
428  shutdown(getSocketId(), SHUT_RD);
429 }
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.
defines used also by OtsConfigurationWizardSupervisor