otsdaq  3.03.00
TCPServerBase.cc
1 // #ifndef BEAGLEBONE
2 // #include "otsdaq_cmsburninbox/BeagleBone/BeagleBoneUtils/TCPServerBase.h"
3 // #else
4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
7 
8 // #endif
9 
10 #include <arpa/inet.h>
11 #include <errno.h> // errno
12 #include <string.h> // errno
13 #include <sys/socket.h>
14 #include <iostream>
15 #include <thread>
16 
17 // #include <sys/socket.h>
18 // #include <netinet/in.h>
19 // #include <netdb.h>
20 
21 using namespace ots;
22 
23 //==============================================================================
24 TCPServerBase::TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients)
25  : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
26 {
27  // 0 or -1 means no restrictions on the number of clients
28  if(fMaxNumberOfClients == 0)
29  fMaxNumberOfClients = (unsigned)-1;
30  // CANNOT GO IN THE CONSTRUCTOR OR IT MIGHT START BEFORE THE CHILD CLASS CONSTRUCTOR IS FULLY CONSTRUCTED
31  // THIS MIGHT RESULT IN THE CALL OF THE VIRTUAL TCPServerBase::acceptConnections
32  // startAccept();
33 }
34 
35 //==============================================================================
36 TCPServerBase::~TCPServerBase(void)
37 {
38  __COUT__ << "Shutting down accept for socket: " << getSocketId() << std::endl;
39  shutdownAccept();
40  while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41  100)) != std::future_status::ready)
42  {
43  __COUT__ << "Server accept still running" << std::endl;
44  shutdownAccept();
45  }
46  //__COUT__ << "Closing connected client sockets for socket: " << getSocketId() << std::endl;
47  closeClientSockets();
48  //__COUT__ << "Closed all sockets connected to server: " << getSocketId() << std::endl;
49 }
50 
51 //==============================================================================
52 void TCPServerBase::startAccept(void)
53 {
54  // __COUT__ << "Begin startAccept" << std::endl;
55  int opt = 1; // SO_REUSEADDR - man socket(7)
56  if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int)) == -1)
57  {
58  close();
59  throw std::runtime_error(std::string("Setsockopt: ") + strerror(errno));
60  }
61 
62  struct sockaddr_in serverAddr;
63  bzero((char*)&serverAddr, sizeof(serverAddr));
64  serverAddr.sin_family = AF_INET;
65  serverAddr.sin_port = htons(fServerPort);
66  serverAddr.sin_addr.s_addr = INADDR_ANY;
67 
68  if(::bind(getSocketId(), (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0)
69  {
70  close();
71  throw std::runtime_error(std::string("Bind: ") + strerror(errno));
72  }
73  // freeaddrinfo(serverAddr); // all done with this structure
74 
75  if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
76  {
77  close();
78  throw std::runtime_error(std::string("Listen: ") + strerror(errno));
79  }
80 
81  fAccept = true;
82  fAcceptFuture =
83  std::async(std::launch::async, &TCPServerBase::acceptConnections, this);
84  // __COUT__ << "Done startAccept" << std::endl;
85 }
86 
88 //==============================================================================
89 int TCPServerBase::accept(bool blocking)
90 {
91  __COUT__ << "Now server accept connections on socket: " << getSocketId() << std::endl;
92  if(getSocketId() == invalidSocketId)
93  {
94  throw std::logic_error(
95  "Accept called on a bad socket object (this object was moved)");
96  }
97 
98  struct sockaddr_storage clientAddress; // connector's address information
99  socklen_t clientAddressSize = sizeof(clientAddress);
100  int clientSocket = invalidSocketId;
101  if(blocking)
102  {
103  //__COUT__ << "Number of connected clients: " << fConnectedClients.size() << std::endl;
104  // clientSocket = ::accept4(getSocketId(),(struct sockaddr *)&clientAddress, &clientAddressSize, 0);
105  // unsigned counter = 0;
106  __COUT__ << "Client list on input:\n";
107  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
108  {
109  __COUT__ << " --> Client: " << it->first << " : " << it->second << std::endl;
110  }
111  while(true)
112  {
113  clientSocket = ::accept(
114  getSocketId(), (struct sockaddr*)&clientAddress, &clientAddressSize);
115  __COUT__ << ": clientSocket returned = " << clientSocket << std::endl;
116 
117  //FIXME: Commenting out this line to avoid seg-fault in the case there are two clients connecting from the same process...
118  // pingActiveClients(); // This message is to check if there are clients that disconnected and, if so, they are removed from the client list
119  if(fAccept && fMaxNumberOfClients > 0 &&
120  fConnectedClients.size() >= fMaxNumberOfClients)
121  {
122  send(clientSocket, "Too many clients connected!", 27, 0);
123  ::shutdown(clientSocket, SHUT_WR);
124  continue;
125  }
126  break;
127  }
128  __COUT__ << "fAccept? " << fAccept << std::endl;
129  if(!fAccept)
130  {
131  throw E_SHUTDOWN;
132  }
133  else if(clientSocket == invalidSocketId)
134  {
135  __COUT__ << "New socket invalid?: " << clientSocket << " errno: " << errno
136  << std::endl;
137  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
138  }
139 
140  __COUT__ << "Server just accepted a connection on socket: " << getSocketId()
141  << " Client socket: " << clientSocket << std::endl;
142  return clientSocket;
143  }
144  else
145  {
146  constexpr int sleepMSeconds = 5;
147  constexpr int timeoutSeconds = 0;
148  constexpr int timeoutUSeconds = 1000;
149  struct timeval timeout;
150  timeout.tv_sec = timeoutSeconds;
151  timeout.tv_usec = timeoutUSeconds;
152 
153  fd_set fdSet;
154 
155  while(fAccept)
156  {
157  FD_ZERO(&fdSet);
158  FD_SET(getSocketId(), &fdSet);
159  select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
160 
161  if(FD_ISSET(getSocketId(), &fdSet))
162  {
163  struct sockaddr_in clientAddress;
164  socklen_t socketSize = sizeof(clientAddress);
165  // int newSocketFD = ::accept4(fdServerSocket_,(struct sockaddr*)&clientAddress,&socketSize, (pushOnly_ ? SOCK_NONBLOCK : 0));
166  clientSocket = ::accept(
167  getSocketId(),
168  (struct sockaddr*)&clientAddress,
169  &socketSize); // Blocking since select goes in timeout if there is nothing
170  if(clientSocket == invalidSocketId)
171  {
172  __COUT__ << "New socket invalid?: " << clientSocket
173  << " errno: " << errno << std::endl;
174  throw std::runtime_error(std::string("Accept: ") + strerror(errno));
175  }
176  return clientSocket;
177  }
178  std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
179  }
180  throw E_SHUTDOWN;
181  }
182 }
183 
184 //==============================================================================
186 void TCPServerBase::closeClientSockets(void)
187 {
188  for(auto& socket : fConnectedClients)
189  {
190  try
191  {
192  socket.second->sendClose();
193  }
194  catch(const std::exception& e)
195  {
196  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
197  // Just do nothing!
198  __COUT__ << e.what() << '\n';
199  }
200 
201  auto clientThread = fConnectedClientsFuture.find(socket.first);
202  if(clientThread != fConnectedClientsFuture.end())
203  clientThread->second.wait(); // Waiting for client thread
204  delete socket.second;
205  }
206  fConnectedClients.clear();
207  fConnectedClientsFuture.clear();
208 }
209 
210 //==============================================================================
211 void TCPServerBase::closeClientSocket(int socket)
212 {
213  // This method is called inside the thread itself so it cannot call the removeClientSocketFuture!!!
214  auto it = fConnectedClients.find(socket);
215  if(it != fConnectedClients.end())
216  {
217  if(it->second->getSocketId() == socket)
218  {
219  try
220  {
221  if(it->second != nullptr)
222  it->second->sendClose();
223  }
224  catch(const std::exception& e)
225  {
226  // I can get here with the TCPPubishServer because it doesn't keep track of the clients that might have already disconnected
227  // Just do nothing!
228  __COUT__ << e.what() << '\n';
229  }
230  delete it->second;
231  fConnectedClients.erase(it);
232  }
233  else
234  {
235  throw std::runtime_error(std::string(
236  "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"));
237  }
238  }
239 }
240 
241 //==============================================================================
242 void TCPServerBase::broadcastPacket(const char* message, std::size_t length)
243 {
244  broadcastPacket(std::string(message, length));
245 }
246 
247 //==============================================================================
248 void TCPServerBase::broadcastPacket(const std::string& message)
249 {
250  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
251  {
252  try
253  {
254  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
255  sock != nullptr)
256  sock->sendPacket(message);
257  }
258  catch(const std::exception& e)
259  {
260  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
261  // contact Lorenzo Uplegger" << std::endl;
262  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
263  // __COUT__ << "Error: " << e.what() << std::endl;
264  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
265  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
266  delete it->second;
267  fConnectedClients.erase(it--);
268  }
269  }
270 }
271 
272 //========================================================================================================================
273 void TCPServerBase::broadcast(const char* message, std::size_t length)
274 {
275  // std::lock_guard<std::mutex> lock(clientsMutex_);
276  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
277  {
278  try
279  {
280  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
281  sock != nullptr)
282  sock->send(message, length);
283  }
284  catch(const std::exception& e)
285  {
286  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
287  // contact Lorenzo Uplegger" << std::endl;
288  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
289  // __COUT__ << "Error: " << e.what() << std::endl;
290  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
291  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
292  delete it->second;
293  fConnectedClients.erase(it--);
294  }
295  }
296 }
297 
298 //==============================================================================
299 void TCPServerBase::broadcast(const std::string& message)
300 {
301  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
302  {
303  try
304  {
305  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
306  sock != nullptr)
307  sock->send(message);
308  }
309  catch(const std::exception& e)
310  {
311  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
312  // contact Lorenzo Uplegger" << std::endl;
313  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
314  // __COUT__ << "Error: " << e.what() << std::endl;
315  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
316  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
317  delete it->second;
318  fConnectedClients.erase(it--);
319  }
320  }
321 }
322 
323 //==============================================================================
324 void TCPServerBase::broadcast(const std::vector<char>& message)
325 {
326  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
327  {
328  try
329  {
330  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
331  sock != nullptr)
332  sock->send(message);
333  }
334  catch(const std::exception& e)
335  {
336  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
337  // contact Lorenzo Uplegger" << std::endl;
338  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
339  // __COUT__ << "Error: " << e.what() << std::endl;
340  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
341  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
342  delete it->second;
343  fConnectedClients.erase(it--);
344  }
345  }
346 }
347 
348 //==============================================================================
349 void TCPServerBase::broadcast(const std::vector<uint16_t>& message)
350 {
351  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
352  {
353  try
354  {
355  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
356  sock != nullptr)
357  sock->send(message);
358  }
359  catch(const std::exception& e)
360  {
361  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
362  __COUT__ << "Error: " << e.what() << std::endl;
363  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
364  {
365  __COUT__ << "Removing client entry from future connected clients list\n";
366  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
367  }
368  __COUT__ << "Removing client entry from connected clients list\n";
369  delete it->second;
370  fConnectedClients.erase(it--);
371  }
372  }
373 }
374 
375 //==============================================================================
376 void TCPServerBase::pingActiveClients()
377 {
378  for(auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
379  {
380  __COUT__ << "Pinging client " << it->first << " : " << it->second << std::endl;
381  try
382  {
383  if(auto sock = dynamic_cast<TCPTransmitterSocket*>(it->second);
384  sock != nullptr)
385  sock->send("", 0, true);
386  }
387  catch(const std::exception& e)
388  {
389  // __COUT__ << "I don't think that this error is possible because I close the socket when I get disconnected...if you see this then you should
390  // contact Lorenzo Uplegger" << std::endl;
391  // __COUT__ << "This should only happen with the TCPSubscribeServer because it doesn't keep track of the connected clients..." << std::endl;
392  __COUT__ << "Error: " << e.what() << std::endl;
393  if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
394  {
395  __COUT__ << "Removing client entry from future connected clients list\n";
396  fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
397  }
398  __COUT__ << "Removing client entry from connected clients list\n";
399  delete it->second;
400  fConnectedClients.erase(it--);
401  }
402  }
403 }
404 
405 //==============================================================================
406 void TCPServerBase::shutdownAccept()
407 {
408  fAccept = false;
409  shutdown(getSocketId(), SHUT_RD);
410 }
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.