4 #include "otsdaq/NetworkUtilities/TCPServerBase.h"
5 #include "otsdaq/Macros/CoutMacros.h"
6 #include "otsdaq/NetworkUtilities/TCPTransmitterSocket.h"
10 #include <arpa/inet.h>
13 #include <sys/socket.h>
25 : fMaxNumberOfClients(maxNumberOfClients), fServerPort(serverPort), fAccept(true)
28 if(fMaxNumberOfClients == 0)
29 fMaxNumberOfClients = (unsigned)-1;
36 TCPServerBase::~TCPServerBase(
void)
38 __COUT__ <<
"Shutting down accept for socket: " << getSocketId() << std::endl;
40 while(fAcceptFuture.valid() && fAcceptFuture.wait_for(std::chrono::milliseconds(
41 100)) != std::future_status::ready)
43 __COUT__ <<
"Server accept still running" << std::endl;
52 void TCPServerBase::startAccept(
void)
54 struct sockaddr_in checkAddr;
55 socklen_t addrLen =
sizeof(checkAddr);
56 if(::getsockname(getSocketId(), (
struct sockaddr*)&checkAddr, &addrLen) == 0 &&
57 checkAddr.sin_port != 0)
59 __COUT__ <<
"Socket " << getSocketId() <<
" is already bound to port "
60 << ntohs(checkAddr.sin_port) << std::endl;
66 if(::setsockopt(getSocketId(), SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(
int)) == -1)
69 __SS__ <<
"Setsockopt: " << strerror(errno) << __E__;
73 struct sockaddr_in serverAddr;
74 bzero((
char*)&serverAddr,
sizeof(serverAddr));
75 serverAddr.sin_family = AF_INET;
76 serverAddr.sin_port = htons(fServerPort);
77 serverAddr.sin_addr.s_addr = INADDR_ANY;
79 if(::bind(getSocketId(), (
struct sockaddr*)&serverAddr,
sizeof(serverAddr)) != 0)
82 __SS__ <<
"Bind to port " << std::to_string(fServerPort) <<
": "
83 << strerror(errno) << __E__;
88 if(::listen(getSocketId(), fMaxConnectionBacklog) != 0)
91 __SS__ <<
"Listen on port " << std::to_string(fServerPort) <<
": "
92 << strerror(errno) << __E__;
98 std::async(std::launch::async, &TCPServerBase::acceptConnections,
this);
104 int TCPServerBase::accept(
bool blocking)
106 __COUT__ <<
"Now server accept connections on socket: " << getSocketId() << std::endl;
107 if(getSocketId() == invalidSocketId)
109 __SS__ <<
"Accept called on a bad socket object (this object was moved)" << __E__;
113 struct sockaddr_storage clientAddress;
114 socklen_t clientAddressSize =
sizeof(clientAddress);
115 int clientSocket = invalidSocketId;
121 __COUT__ <<
"Client list on input:\n";
122 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
124 __COUT__ <<
" --> Client: " << it->first <<
" : " << it->second << std::endl;
128 clientSocket = ::accept(
129 getSocketId(), (
struct sockaddr*)&clientAddress, &clientAddressSize);
130 __COUT__ <<
": clientSocket returned = " << clientSocket << std::endl;
134 if(fAccept && fMaxNumberOfClients > 0 &&
135 fConnectedClients.size() >= fMaxNumberOfClients)
137 send(clientSocket,
"Too many clients connected!", 27, 0);
138 ::shutdown(clientSocket, SHUT_WR);
143 __COUT__ <<
"fAccept? " << fAccept << std::endl;
148 else if(clientSocket == invalidSocketId)
150 __COUT__ <<
"New socket invalid?: " << clientSocket <<
" errno: " << errno
152 __SS__ <<
"Accept: " << strerror(errno) << __E__;
156 __COUT__ <<
"Server just accepted a connection on socket: " << getSocketId()
157 <<
" Client socket: " << clientSocket << std::endl;
162 constexpr
int sleepMSeconds = 5;
163 constexpr
int timeoutSeconds = 0;
164 constexpr
int timeoutUSeconds = 1000;
165 struct timeval timeout;
166 timeout.tv_sec = timeoutSeconds;
167 timeout.tv_usec = timeoutUSeconds;
174 FD_SET(getSocketId(), &fdSet);
175 select(getSocketId() + 1, &fdSet, 0, 0, &timeout);
177 if(FD_ISSET(getSocketId(), &fdSet))
179 struct sockaddr_in clientAddress;
180 socklen_t socketSize =
sizeof(clientAddress);
182 clientSocket = ::accept(
184 (
struct sockaddr*)&clientAddress,
186 if(clientSocket == invalidSocketId)
188 __COUT__ <<
"New socket invalid?: " << clientSocket
189 <<
" errno: " << errno << std::endl;
190 __SS__ <<
"Accept: " << strerror(errno) << __E__;
195 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMSeconds));
203 void TCPServerBase::closeClientSockets(
void)
205 for(
auto& socket : fConnectedClients)
209 socket.second->sendClose();
211 catch(
const std::exception& e)
215 __COUT__ << e.what() <<
'\n';
218 auto clientThread = fConnectedClientsFuture.find(socket.first);
219 if(clientThread != fConnectedClientsFuture.end())
220 clientThread->second.wait();
221 delete socket.second;
223 fConnectedClients.clear();
224 fConnectedClientsFuture.clear();
228 void TCPServerBase::closeClientSocket(
int socket)
231 auto it = fConnectedClients.find(socket);
232 if(it != fConnectedClients.end())
234 if(it->second->getSocketId() == socket)
238 if(it->second !=
nullptr)
239 it->second->sendClose();
241 catch(
const std::exception& e)
245 __COUT__ << e.what() <<
'\n';
248 fConnectedClients.erase(it);
253 <<
"SocketId in fConnectedClients != socketId in TCPSocket! Impossible!!!"
261 void TCPServerBase::broadcastPacket(
const char* message, std::size_t length)
263 broadcastPacket(std::string(message, length));
267 void TCPServerBase::broadcastPacket(
const std::string& message)
269 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
275 sock->sendPacket(message);
277 catch(
const std::exception& e)
283 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
284 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
286 fConnectedClients.erase(it--);
292 void TCPServerBase::broadcast(
const char* message, std::size_t length)
295 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
301 sock->send(message, length);
303 catch(
const std::exception& e)
309 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
310 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
312 fConnectedClients.erase(it--);
318 void TCPServerBase::broadcast(
const std::string& message)
320 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
328 catch(
const std::exception& e)
334 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
335 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
337 fConnectedClients.erase(it--);
343 void TCPServerBase::broadcast(
const std::vector<char>& message)
345 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
353 catch(
const std::exception& e)
359 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
360 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
362 fConnectedClients.erase(it--);
368 void TCPServerBase::broadcast(
const std::vector<uint16_t>& message)
370 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
378 catch(
const std::exception& e)
381 __COUT__ <<
"Error: " << e.what() << std::endl;
382 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
384 __COUT__ <<
"Removing client entry from future connected clients list\n";
385 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
387 __COUT__ <<
"Removing client entry from connected clients list\n";
389 fConnectedClients.erase(it--);
395 void TCPServerBase::pingActiveClients()
397 for(
auto it = fConnectedClients.begin(); it != fConnectedClients.end(); it++)
399 __COUT__ <<
"Pinging client " << it->first <<
" : " << it->second << std::endl;
404 sock->send(
"", 0,
true);
406 catch(
const std::exception& e)
411 __COUT__ <<
"Error: " << e.what() << std::endl;
412 if(fConnectedClientsFuture.find(it->first) != fConnectedClientsFuture.end())
414 __COUT__ <<
"Removing client entry from future connected clients list\n";
415 fConnectedClientsFuture.erase(fConnectedClientsFuture.find(it->first));
417 __COUT__ <<
"Removing client entry from connected clients list\n";
419 fConnectedClients.erase(it--);
425 void TCPServerBase::shutdownAccept()
428 shutdown(getSocketId(), SHUT_RD);
TCPServerBase(unsigned int serverPort, unsigned int maxNumberOfClients=0)
Means as many unsigned allows.
A class that can write to a socket.
defines used also by OtsConfigurationWizardSupervisor