4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
10 #include <arpa/inet.h>
13 #include <sys/socket.h>
25 : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
28 if(fMaxNumberOfClients == 0)
29 fMaxNumberOfClients = (unsigned)-1;
36 TCPServerBase::~TCPServerBase(
void)
38 __COUT__ <<
"Shutting down accept for socket: " << getSocketId() << std::endl;
40 while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41 100)) != std::future_status::ready)
43 __COUT__ <<
"Server accept still running" << std::endl;
52 void TCPServerBase::startAccept(
void)
54 struct sockaddr_in checkAddr;
55 socklen_t addrLen =
sizeof(checkAddr);
56 if(::getsockname(getSocketId(), (
struct sockaddr*)&checkAddr, &addrLen) == 0 &&
57 checkAddr.sin_port != 0)
59 __COUT__ <<
"Socket " << getSocketId() <<
" is already bound to port "
60 << ntohs(checkAddr.sin_port) << std::endl;
66 if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(
int)) == -1)
69 __SS__ <<
"Setsockopt: " << strerror(errno) << __E__;
73 struct sockaddr_in serverAddr;
74 bzero((
char*)&serverAddr,
sizeof(serverAddr));
75 serverAddr.sin_family = AF_INET;
76 serverAddr.sin_port = htons(fServerPort);
77 serverAddr.sin_addr.s_addr = INADDR_ANY;
79 if(::bind(getSocketId(), (
struct sockaddr*)&serverAddr,
sizeof(serverAddr)) != 0)
82 __SS__ <<
"Bind: " << strerror(errno) << __E__;
87 if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
90 __SS__ <<
"Listen: " << strerror(errno) << __E__;
96 std::async(std::launch::async, &TCPServerBase::acceptConnections,
this);
102 int TCPServerBase::accept(
bool blocking)
104 __COUT__ <<
"Now server accept connections on socket: " << getSocketId() << std::endl;
105 if(getSocketId() == invalidSocketId)
107 __SS__ <<
"Accept called on a bad socket object (this object was moved)" << __E__;
111 struct sockaddr_storage clientAddress;
112 socklen_t clientAddressSize =
sizeof(clientAddress);
113 int clientSocket = invalidSocketId;
119 __COUT__ <<
"Client list on input:\n";
120 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
122 __COUT__ <<
" --> Client: " << it->first <<
" : " << it->second << std::endl;
126 clientSocket = ::accept(
127 getSocketId(), (
struct sockaddr*)&clientAddress, &clientAddressSize);
128 __COUT__ <<
": clientSocket returned = " << clientSocket << std::endl;
132 if(fAccept && fMaxNumberOfClients > 0 &&
133 fConnectedClients.size() >= fMaxNumberOfClients)
135 send(clientSocket,
"Too many clients connected!", 27, 0);
136 ::shutdown(clientSocket, SHUT_WR);
141 __COUT__ <<
"fAccept? " << fAccept << std::endl;
146 else if(clientSocket == invalidSocketId)
148 __COUT__ <<
"New socket invalid?: " << clientSocket <<
" errno: " << errno
150 __SS__ <<
"Accept: " << strerror(errno) << __E__;
154 __COUT__ <<
"Server just accepted a connection on socket: " << getSocketId()
155 <<
" Client socket: " << clientSocket << std::endl;
160 constexpr
int sleepMSeconds = 5;
161 constexpr
int timeoutSeconds = 0;
162 constexpr
int timeoutUSeconds = 1000;
163 struct timeval timeout;
164 timeout.tv_sec = timeoutSeconds;
165 timeout.tv_usec = timeoutUSeconds;
172 FD_SET(getSocketId(), &fdSet);
173 select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
175 if(FD_ISSET(getSocketId(), &fdSet))
177 struct sockaddr_in clientAddress;
178 socklen_t socketSize =
sizeof(clientAddress);
180 clientSocket = ::accept(
182 (
struct sockaddr*)&clientAddress,
184 if(clientSocket == invalidSocketId)
186 __COUT__ <<
"New socket invalid?: " << clientSocket
187 <<
" errno: " << errno << std::endl;
188 __SS__ <<
"Accept: " << strerror(errno) << __E__;
193 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
201 void TCPServerBase::closeClientSockets(
void)
203 for(
auto& socket : fConnectedClients)
207 socket.second->sendClose();
209 catch(
const std::exception& e)
213 __COUT__ << e.what() <<
'\n';
216 auto clientThread = fConnectedClientsFuture.find(socket.first);
217 if(clientThread != fConnectedClientsFuture.end())
218 clientThread->second.wait();
219 delete socket.second;
221 fConnectedClients.clear();
222 fConnectedClientsFuture.clear();
226 void TCPServerBase::closeClientSocket(
int socket)
229 auto it = fConnectedClients.find(socket);
230 if(it != fConnectedClients.end())
232 if(it->second->getSocketId() == socket)
236 if(it->second !=
nullptr)
237 it->second->sendClose();
239 catch(
const std::exception& e)
243 __COUT__ << e.what() <<
'\n';
246 fConnectedClients.erase(it);
251 <<
"SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"
259 void TCPServerBase::broadcastPacket(
const char* message, std::size_t length)
261 broadcastPacket(std::string(message, length));
265 void TCPServerBase::broadcastPacket(
const std::string& message)
267 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
273 sock->sendPacket(message);
275 catch(
const std::exception& e)
281 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
282 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
284 fConnectedClients.erase(it--);
290 void TCPServerBase::broadcast(
const char* message, std::size_t length)
293 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
299 sock->send(message, length);
301 catch(
const std::exception& e)
307 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
308 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
310 fConnectedClients.erase(it--);
316 void TCPServerBase::broadcast(
const std::string& message)
318 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
326 catch(
const std::exception& e)
332 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
333 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
335 fConnectedClients.erase(it--);
341 void TCPServerBase::broadcast(
const std::vector<char>& message)
343 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
351 catch(
const std::exception& e)
357 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
358 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
360 fConnectedClients.erase(it--);
366 void TCPServerBase::broadcast(
const std::vector<uint16_t>& message)
368 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
376 catch(
const std::exception& e)
379 __COUT__ <<
"Error: " << e.what() << std::endl;
380 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
382 __COUT__ <<
"Removing client entry from future connected clients list\n";
383 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
385 __COUT__ <<
"Removing client entry from connected clients list\n";
387 fConnectedClients.erase(it--);
393 void TCPServerBase::pingActiveClients()
395 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
397 __COUT__ <<
"Pinging client " << it->first <<
" : " << it->second << std::endl;
402 sock->send(
"", 0,
true);
404 catch(
const std::exception& e)
409 __COUT__ <<
"Error: " << e.what() << std::endl;
410 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
412 __COUT__ <<
"Removing client entry from future connected clients list\n";
413 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
415 __COUT__ <<
"Removing client entry from connected clients list\n";
417 fConnectedClients.erase(it--);
423 void TCPServerBase::shutdownAccept()
426 shutdown(getSocketId(), SHUT_RD);
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.