otsdaq  3.03.00
TCPServerBase.cc
1 // #ifndef BEAGLEBONE
2 // #include "otsdaq_cmsburninbox/BeagleBone/BeagleBoneUtils/TCPServerBase.h"
3 // #else
4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
7 
8 // #endif
9 
10 #include <arpa/inet.h>
11 #include <errno.h> // errno
12 #include <string.h> // errno
13 #include <sys/socket.h>
14 #include <iostream>
15 #include <thread>
16 
17 // #include <sys/socket.h>
18 // #include <netinet/in.h>
19 // #include <netdb.h>
20 
21 using namespace ots;
22 
23 //==============================================================================
24 TCPServerBase::TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients)
25  : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
26 {
27  // 0 or -1 means no restrictions on the number of clients
28  if(fMaxNumberOfClients == 0)
29  fMaxNumberOfClients = (unsigned)-1;
30  // CANNOT GO IN THE CONSTRUCTOR OR IT MIGHT START BEFORE THE CHILD CLASS CONSTRUCTOR IS FULLY CONSTRUCTED
31  // THIS MIGHT RESULT IN THE CALL OF THE VIRTUAL TCPServerBase::acceptConnections
32  // startAccept();
33 }
34 
35 //==============================================================================
36 TCPServerBase::~TCPServerBase(void)
37 {
38  __COUT__ << "Shutting down accept for socket: " << getSocketId() << std::endl;
39  shutdownAccept();
40  while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41  100)) != std::future_status::ready)
42  {
43  __COUT__ << "Server accept still running" << std::endl;
44  shutdownAccept();
45  }
46  //__COUT__ << "Closing connected client sockets for socket: " << getSocketId() << std::endl;
47  closeClientSockets();
48  //__COUT__ << "Closed all sockets connected to server: " << getSocketId() << std::endl;
49 }
50 
51 //==============================================================================
52 void TCPServerBase::startAccept(void)
53 {
54  struct sockaddr_in checkAddr;
55  socklen_t addrLen = sizeof(checkAddr);
56  if(::getsockname(getSocketId(), (struct sockaddr*)&checkAddr, &addrLen) == 0 &&
57  checkAddr.sin_port != 0)
58  {
59  __COUT__ << "Socket " << getSocketId() << " is already bound to port "
60  << ntohs(checkAddr.sin_port) << std::endl;
61  return;
62  }
63 
64  // __COUT__ << "Begin startAccept" << std::endl;
65  int opt = 1; // SO_REUSEADDR - man socket(7)
66  if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int)) == -1)
67  {
68  close();
69  __SS__ << "Setsockopt: " << strerror(errno) << __E__;
70  __SS_THROW__;
71  }
72 
73  struct sockaddr_in serverAddr;
74  bzero((char*)&serverAddr, sizeof(serverAddr));
75  serverAddr.sin_family = AF_INET;
76  serverAddr.sin_port = htons(fServerPort);
77  serverAddr.sin_addr.s_addr = INADDR_ANY;
78 
79  if(::bind(getSocketId(), (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0)
80  {
81  close();
82  __SS__ << "Bind: " << strerror(errno) << __E__;
83  __SS_THROW__;
84  }
85  // freeaddrinfo(serverAddr); // all done with this structure
86 
87  if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
88  {
89  close();
90  __SS__ << "Listen: " << strerror(errno) << __E__;
91  __SS_THROW__;
92  }
93 
94  fAccept = true;
95  fAcceptFuture =
96  std::async(std::launch::async, &TCPServerBase::acceptConnections, this);
97  // __COUT__ << "Done startAccept" << std::endl;
98 }
99 
101 //==============================================================================
102 int TCPServerBase::accept(bool blocking)
103 {
104  __COUT__ << "Now server accept connections on socket: " << getSocketId() << std::endl;
105  if(getSocketId() == invalidSocketId)
106  {
107  __SS__ << "Accept called on a bad socket object (this object was moved)" << __E__;
108  __SS_THROW__;
109  }
110 
111  struct sockaddr_storage clientAddress; // connector's address information
112  socklen_t clientAddressSize = sizeof(clientAddress);
113  int clientSocket = invalidSocketId;
114  if(blocking)
115  {
116  //__COUT__ << "Number of connected clients: " << fConnectedClients.size() << std::endl;
117  // clientSocket = ::accept4(getSocketId(),(struct sockaddr *)&clientAddress, &clientAddressSize, 0);
118  // unsigned counter = 0;
119  __COUT__ << "Client list on input:\n";
120  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
121  {
122  __COUT__ << " --> Client: " << it->first << " : " << it->second << std::endl;
123  }
124  while(true)
125  {
126  clientSocket = ::accept(
127  getSocketId(), (struct sockaddr*)&clientAddress, &clientAddressSize);
128  __COUT__ << ": clientSocket returned = " << clientSocket << std::endl;
129 
130  //FIXME: Commenting out this line to avoid seg-fault in the case there are two clients connecting from the same process...
131  // pingActiveClients(); // This message is to check if there are clients that disconnected and, if so, they are removed from the client list
132  if(fAccept && fMaxNumberOfClients > 0 &&
133  fConnectedClients.size() >= fMaxNumberOfClients)
134  {
135  send(clientSocket, "Too many clients connected!", 27, 0);
136  ::shutdown(clientSocket, SHUT_WR);
137  continue;
138  }
139  break;
140  }
141  __COUT__ << "fAccept? " << fAccept << std::endl;
142  if(!fAccept)
143  {
144  throw E_SHUTDOWN;
145  }
146  else if(clientSocket == invalidSocketId)
147  {
148  __COUT__ << "New socket invalid?: " << clientSocket << " errno: " << errno
149  << std::endl;
150  __SS__ << "Accept: " << strerror(errno) << __E__;
151  __SS_THROW__;
152  }
153 
154  __COUT__ << "Server just accepted a connection on socket: " << getSocketId()
155  << " Client socket: " << clientSocket << std::endl;
156  return clientSocket;
157  }
158  else
159  {
160  constexpr int sleepMSeconds = 5;
161  constexpr int timeoutSeconds = 0;
162  constexpr int timeoutUSeconds = 1000;
163  struct timeval timeout;
164  timeout.tv_sec = timeoutSeconds;
165  timeout.tv_usec = timeoutUSeconds;
166 
167  fd_set fdSet;
168 
169  while(fAccept)
170  {
171  FD_ZERO(&fdSet);
172  FD_SET(getSocketId(), &fdSet);
173  select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
174 
175  if(FD_ISSET(getSocketId(), &fdSet))
176  {
177  struct sockaddr_in clientAddress;
178  socklen_t socketSize = sizeof(clientAddress);
179  // int newSocketFD = ::accept4(fdServerSocket_,(struct sockaddr*)&clientAddress,&socketSize, (pushOnly_ ? SOCK_NONBLOCK : 0));
180  clientSocket = ::accept(
181  getSocketId(),
182  (struct sockaddr*)&clientAddress,
183  &socketSize); // Blocking since select goes in timeout if there is nothing
184  if(clientSocket == invalidSocketId)
185  {
186  __COUT__ << "New socket invalid?: " << clientSocket
187  << " errno: " << errno << std::endl;
188  __SS__ << "Accept: " << strerror(errno) << __E__;
189  __SS_THROW__;
190  }
191  return clientSocket;
192  }
193  std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
194  }
195  throw E_SHUTDOWN;
196  }
197 }
198 
199 //==============================================================================
201 void TCPServerBase::closeClientSockets(void)
202 {
203  for(auto& socket : fConnectedClients)
204  {
205  try
206  {
207  socket.second->sendClose();
208  }
209  catch(const std::exception& e)
210  {
211  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
212  // Just do nothing!
213  __COUT__ << e.what() << '\n';
214  }
215 
216  auto clientThread = fConnectedClientsFuture.find(socket.first);
217  if(clientThread != fConnectedClientsFuture.end())
218  clientThread->second.wait(); // Waiting for client thread
219  delete socket.second;
220  }
221  fConnectedClients.clear();
222  fConnectedClientsFuture.clear();
223 }
224 
225 //==============================================================================
226 void TCPServerBase::closeClientSocket(int socket)
227 {
228  // This method is called inside the thread itself so it cannot call the removeClientSocketFuture!!!
229  auto it = fConnectedClients.find(socket);
230  if(it != fConnectedClients.end())
231  {
232  if(it->second->getSocketId() == socket)
233  {
234  try
235  {
236  if(it->second != nullptr)
237  it->second->sendClose();
238  }
239  catch(const std::exception& e)
240  {
241  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
242  // Just do nothing!
243  __COUT__ << e.what() << '\n';
244  }
245  delete it->second;
246  fConnectedClients.erase(it);
247  }
248  else
249  {
250  __SS__
251  << "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"
252  << __E__;
253  __SS_THROW__;
254  }
255  }
256 }
257 
258 //==============================================================================
259 void TCPServerBase::broadcastPacket(const char* message, std::size_t length)
260 {
261  broadcastPacket(std::string(message, length));
262 }
263 
264 //==============================================================================
265 void TCPServerBase::broadcastPacket(const std::string& message)
266 {
267  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
268  {
269  try
270  {
271  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
272  sock != nullptr)
273  sock->sendPacket(message);
274  }
275  catch(const std::exception& e)
276  {
277  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
278  // contact Lorenzo Uplegger" << std::endl;
279  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
280  // __COUT__ << "Error: " << e.what() << std::endl;
281  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
282  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
283  delete it->second;
284  fConnectedClients.erase(it--);
285  }
286  }
287 }
288 
289 //========================================================================================================================
290 void TCPServerBase::broadcast(const char* message, std::size_t length)
291 {
292  // std::lock_guard<std::mutex> lock(clientsMutex_);
293  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
294  {
295  try
296  {
297  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
298  sock != nullptr)
299  sock->send(message, length);
300  }
301  catch(const std::exception& e)
302  {
303  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
304  // contact Lorenzo Uplegger" << std::endl;
305  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
306  // __COUT__ << "Error: " << e.what() << std::endl;
307  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
308  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
309  delete it->second;
310  fConnectedClients.erase(it--);
311  }
312  }
313 }
314 
315 //==============================================================================
316 void TCPServerBase::broadcast(const std::string& message)
317 {
318  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
319  {
320  try
321  {
322  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
323  sock != nullptr)
324  sock->send(message);
325  }
326  catch(const std::exception& e)
327  {
328  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
329  // contact Lorenzo Uplegger" << std::endl;
330  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
331  // __COUT__ << "Error: " << e.what() << std::endl;
332  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
333  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
334  delete it->second;
335  fConnectedClients.erase(it--);
336  }
337  }
338 }
339 
340 //==============================================================================
341 void TCPServerBase::broadcast(const std::vector<char>& message)
342 {
343  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
344  {
345  try
346  {
347  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
348  sock != nullptr)
349  sock->send(message);
350  }
351  catch(const std::exception& e)
352  {
353  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
354  // contact Lorenzo Uplegger" << std::endl;
355  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
356  // __COUT__ << "Error: " << e.what() << std::endl;
357  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
358  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
359  delete it->second;
360  fConnectedClients.erase(it--);
361  }
362  }
363 }
364 
365 //==============================================================================
366 void TCPServerBase::broadcast(const std::vector<uint16_t>& message)
367 {
368  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
369  {
370  try
371  {
372  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
373  sock != nullptr)
374  sock->send(message);
375  }
376  catch(const std::exception& e)
377  {
378  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
379  __COUT__ << "Error: " << e.what() << std::endl;
380  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
381  {
382  __COUT__ << "Removing client entry from future connected clients list\n";
383  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
384  }
385  __COUT__ << "Removing client entry from connected clients list\n";
386  delete it->second;
387  fConnectedClients.erase(it--);
388  }
389  }
390 }
391 
392 //==============================================================================
393 void TCPServerBase::pingActiveClients()
394 {
395  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
396  {
397  __COUT__ << "Pinging client " << it->first << " : " << it->second << std::endl;
398  try
399  {
400  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
401  sock != nullptr)
402  sock->send("", 0, true);
403  }
404  catch(const std::exception& e)
405  {
406  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
407  // contact Lorenzo Uplegger" << std::endl;
408  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
409  __COUT__ << "Error: " << e.what() << std::endl;
410  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
411  {
412  __COUT__ << "Removing client entry from future connected clients list\n";
413  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
414  }
415  __COUT__ << "Removing client entry from connected clients list\n";
416  delete it->second;
417  fConnectedClients.erase(it--);
418  }
419  }
420 }
421 
422 //==============================================================================
423 void TCPServerBase::shutdownAccept()
424 {
425  fAccept = false;
426  shutdown(getSocketId(), SHUT_RD);
427 }
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.