4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
10 #include <arpa/inet.h>
13 #include <sys/socket.h>
25 : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
28 if(fMaxNumberOfClients == 0)
29 fMaxNumberOfClients = (unsigned)-1;
36 TCPServerBase::~TCPServerBase(
void)
38 __COUT__ <<
"Shutting down accept for socket: " << getSocketId() << std::endl;
40 while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41 100)) != std::future_status::ready)
43 __COUT__ <<
"Server accept still running" << std::endl;
52 void TCPServerBase::startAccept(
void)
56 if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(
int)) == -1)
59 throw std::runtime_error(std::string(
"Setsockopt: ") + strerror(errno));
62 struct sockaddr_in serverAddr;
63 bzero((
char*)&serverAddr,
sizeof(serverAddr));
64 serverAddr.sin_family = AF_INET;
65 serverAddr.sin_port = htons(fServerPort);
66 serverAddr.sin_addr.s_addr = INADDR_ANY;
68 if(::bind(getSocketId(), (
struct sockaddr*)&serverAddr,
sizeof(serverAddr)) != 0)
71 throw std::runtime_error(std::string(
"Bind: ") + strerror(errno));
75 if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
78 throw std::runtime_error(std::string(
"Listen: ") + strerror(errno));
83 std::async(std::launch::async, &TCPServerBase::acceptConnections,
this);
89 int TCPServerBase::accept(
bool blocking)
91 __COUT__ <<
"Now server accept connections on socket: " << getSocketId() << std::endl;
92 if(getSocketId() == invalidSocketId)
94 throw std::logic_error(
95 "Accept called on a bad socket object (this object was moved)");
98 struct sockaddr_storage clientAddress;
99 socklen_t clientAddressSize =
sizeof(clientAddress);
100 int clientSocket = invalidSocketId;
106 __COUT__ <<
"Client list on input:\n";
107 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
109 __COUT__ <<
" --> Client: " << it->first <<
" : " << it->second << std::endl;
113 clientSocket = ::accept(
114 getSocketId(), (
struct sockaddr*)&clientAddress, &clientAddressSize);
115 __COUT__ <<
": clientSocket returned = " << clientSocket << std::endl;
119 if(fAccept && fMaxNumberOfClients > 0 &&
120 fConnectedClients.size() >= fMaxNumberOfClients)
122 send(clientSocket,
"Too many clients connected!", 27, 0);
123 ::shutdown(clientSocket, SHUT_WR);
128 __COUT__ <<
"fAccept? " << fAccept << std::endl;
133 else if(clientSocket == invalidSocketId)
135 __COUT__ <<
"New socket invalid?: " << clientSocket <<
" errno: " << errno
137 throw std::runtime_error(std::string(
"Accept: ") + strerror(errno));
140 __COUT__ <<
"Server just accepted a connection on socket: " << getSocketId()
141 <<
" Client socket: " << clientSocket << std::endl;
146 constexpr
int sleepMSeconds = 5;
147 constexpr
int timeoutSeconds = 0;
148 constexpr
int timeoutUSeconds = 1000;
149 struct timeval timeout;
150 timeout.tv_sec = timeoutSeconds;
151 timeout.tv_usec = timeoutUSeconds;
158 FD_SET(getSocketId(), &fdSet);
159 select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
161 if(FD_ISSET(getSocketId(), &fdSet))
163 struct sockaddr_in clientAddress;
164 socklen_t socketSize =
sizeof(clientAddress);
166 clientSocket = ::accept(
168 (
struct sockaddr*)&clientAddress,
170 if(clientSocket == invalidSocketId)
172 __COUT__ <<
"New socket invalid?: " << clientSocket
173 <<
" errno: " << errno << std::endl;
174 throw std::runtime_error(std::string(
"Accept: ") + strerror(errno));
178 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
186 void TCPServerBase::closeClientSockets(
void)
188 for(
auto& socket : fConnectedClients)
192 socket.second->sendClose();
194 catch(
const std::exception& e)
198 __COUT__ << e.what() <<
'\n';
201 auto clientThread = fConnectedClientsFuture.find(socket.first);
202 if(clientThread != fConnectedClientsFuture.end())
203 clientThread->second.wait();
204 delete socket.second;
206 fConnectedClients.clear();
207 fConnectedClientsFuture.clear();
211 void TCPServerBase::closeClientSocket(
int socket)
214 auto it = fConnectedClients.find(socket);
215 if(it != fConnectedClients.end())
217 if(it->second->getSocketId() == socket)
221 if(it->second !=
nullptr)
222 it->second->sendClose();
224 catch(
const std::exception& e)
228 __COUT__ << e.what() <<
'\n';
231 fConnectedClients.erase(it);
235 throw std::runtime_error(std::string(
236 "SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"));
242 void TCPServerBase::broadcastPacket(
const char* message, std::size_t length)
244 broadcastPacket(std::string(message, length));
248 void TCPServerBase::broadcastPacket(
const std::string& message)
250 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
256 sock->sendPacket(message);
258 catch(
const std::exception& e)
264 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
265 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
267 fConnectedClients.erase(it--);
273 void TCPServerBase::broadcast(
const char* message, std::size_t length)
276 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
282 sock->send(message, length);
284 catch(
const std::exception& e)
290 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
291 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
293 fConnectedClients.erase(it--);
299 void TCPServerBase::broadcast(
const std::string& message)
301 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
309 catch(
const std::exception& e)
315 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
316 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
318 fConnectedClients.erase(it--);
324 void TCPServerBase::broadcast(
const std::vector<char>& message)
326 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
334 catch(
const std::exception& e)
340 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
341 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
343 fConnectedClients.erase(it--);
349 void TCPServerBase::broadcast(
const std::vector<uint16_t>& message)
351 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
359 catch(
const std::exception& e)
362 __COUT__ <<
"Error: " << e.what() << std::endl;
363 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
365 __COUT__ <<
"Removing client entry from future connected clients list\n";
366 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
368 __COUT__ <<
"Removing client entry from connected clients list\n";
370 fConnectedClients.erase(it--);
376 void TCPServerBase::pingActiveClients()
378 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
380 __COUT__ <<
"Pinging client " << it->first <<
" : " << it->second << std::endl;
385 sock->send(
"", 0,
true);
387 catch(
const std::exception& e)
392 __COUT__ <<
"Error: " << e.what() << std::endl;
393 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
395 __COUT__ <<
"Removing client entry from future connected clients list\n";
396 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
398 __COUT__ <<
"Removing client entry from connected clients list\n";
400 fConnectedClients.erase(it--);
406 void TCPServerBase::shutdownAccept()
409 shutdown(getSocketId(), SHUT_RD);
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.