1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
20 TransceiverSocket::TransceiverSocket(
void)
22 __COUT__ <<
"TransceiverSocket constructor " << __E__;
26 TransceiverSocket::TransceiverSocket(std::string IPAddress,
unsigned int port)
29 __COUT__ <<
"TransceiverSocket constructor " << IPAddress <<
":" << port << __E__;
33 TransceiverSocket::~TransceiverSocket(
void) {}
42 unsigned int interPacketGapUSeconds ,
43 bool enableRetransmission )
46 __COUTT__ <<
"Acknowledging on Socket Descriptor #: " << socketNumber_
47 <<
" from-port: " << ntohs(socketAddress_.sin_port)
48 <<
" to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
49 <<
" retransmission: " << (enableRetransmission ?
"ON" :
"OFF")
52 if(!enableRetransmission)
58 std::lock_guard<std::mutex> lock(sendMutex_);
60 const size_t MAX_SEND_SIZE =
61 maxChunkSize > 65500u ?
static_cast<size_t>(65500u) : maxChunkSize;
66 while(offset < buffer.size() && sendToSize > 0)
68 auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
70 : sizeInBytes * (buffer.size() - offset);
73 sendToSize = sendto(socketNumber_,
77 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
79 offset += sendToSize / sizeInBytes;
80 if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0)
81 usleep(interPacketGapUSeconds);
86 __SS__ <<
"Error writing buffer from port "
87 << ntohs(TransmitterSocket::socketAddress_.sin_port) <<
": "
88 << strerror(errno) << std::endl;
98 return sendAll(buffer, verbose, maxChunkSize, interPacketGapUSeconds);
113 size_t maxChunkSize ,
114 unsigned int interPacketGapUSeconds )
117 __COUT__ <<
"sendAll: retransmission-mode send on Socket Descriptor #: "
118 << socketNumber_ <<
" from-port: " << ntohs(socketAddress_.sin_port)
119 <<
" to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
120 <<
" buffer size: " << buffer.size() << __E__;
122 const size_t MAX_SEND_SIZE =
123 maxChunkSize > 65500u ?
static_cast<size_t>(65500u) : maxChunkSize;
126 const size_t payloadMax = MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE
127 ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE
132 const size_t packetsNeeded = (buffer.size() + payloadMax - 1) / payloadMax;
133 if(packetsNeeded > std::numeric_limits<uint16_t>::max())
135 __SS__ <<
"sendAll: buffer size " << buffer.size() <<
" requires "
137 <<
" packets, which exceeds the uint16_t protocol limit of "
138 << std::numeric_limits<uint16_t>::max() <<
" (payloadMax=" << payloadMax
139 <<
")." << std::endl;
142 uint16_t totalPackets =
static_cast<uint16_t
>(packetsNeeded);
143 if(totalPackets == 0)
147 __COUT__ <<
"sendAll: sending " << totalPackets <<
" packets for "
148 << buffer.size() <<
" bytes, payloadMax=" << payloadMax << __E__;
151 std::vector<std::string> packets(totalPackets);
154 for(uint16_t pi = 0; pi < totalPackets; ++pi)
156 size_t payloadSize = (buffer.size() - offset) > payloadMax
158 : (buffer.size() - offset);
160 char header[RETRANSMIT_HEADER_SIZE];
162 uint16_t netIndex = htons(pi);
163 uint16_t netTotal = htons(totalPackets);
164 uint16_t netPaySize = htons(
static_cast<uint16_t
>(payloadSize));
165 std::memcpy(header + 0, &netMagic, 2);
166 std::memcpy(header + 2, &netIndex, 2);
167 std::memcpy(header + 4, &netTotal, 2);
168 std::memcpy(header + 6, &netPaySize, 2);
170 packets[pi].assign(header, RETRANSMIT_HEADER_SIZE);
171 packets[pi].append(buffer, offset, payloadSize);
172 offset += payloadSize;
178 std::lock_guard<std::mutex> lock(sendMutex_);
179 for(uint16_t pi = 0; pi < totalPackets; ++pi)
181 int sendToSize = sendto(socketNumber_,
185 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
186 sizeof(sockaddr_in));
189 __SS__ <<
"sendAll: error writing packet " << pi <<
"/" << totalPackets
190 <<
" from port " << ntohs(socketAddress_.sin_port) <<
": "
191 << strerror(errno) << std::endl;
195 __COUTT__ <<
"sendAll: sent packet " << pi <<
"/" << totalPackets
196 <<
" size=" << packets[pi].size() << std::endl;
198 if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets)
199 usleep(interPacketGapUSeconds);
206 const unsigned int retransmitTimeoutSeconds = 5;
207 const unsigned int maxRetransmitRounds = 20;
209 for(
unsigned int round = 0; round < maxRetransmitRounds; ++round)
211 std::string retransmitRequest;
212 int rc =
receive(retransmitRequest,
213 retransmitTimeoutSeconds,
220 __COUT__ <<
"sendAll: no retransmit request after "
221 << retransmitTimeoutSeconds
222 <<
"s timeout, assuming transfer complete." << __E__;
226 if(retransmitRequest.size() < 4)
230 std::memcpy(&reqMagic, retransmitRequest.data(), 2);
231 reqMagic = ntohs(reqMagic);
237 std::memcpy(&firstVal, retransmitRequest.data() + 2, 2);
238 firstVal = ntohs(firstVal);
239 if(firstVal == 0xFFFF)
242 __COUT__ <<
"sendAll: received 'all done' from receiver." << __E__;
247 size_t numIndices = (retransmitRequest.size() - 2) / 2;
249 __COUT__ <<
"sendAll: retransmit request for " << numIndices
250 <<
" packets (round " << round <<
")." << __E__;
253 std::lock_guard<std::mutex> lock(sendMutex_);
254 for(
size_t i = 0; i < numIndices; ++i)
257 std::memcpy(&missingIdx, retransmitRequest.data() + 2 + i * 2, 2);
258 missingIdx = ntohs(missingIdx);
260 if(missingIdx < totalPackets)
262 int sendToSize = sendto(socketNumber_,
263 packets[missingIdx].data(),
264 packets[missingIdx].size(),
266 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
267 sizeof(sockaddr_in));
270 __SS__ <<
"sendAll: error resending packet " << missingIdx <<
": "
271 << strerror(errno) << std::endl;
275 __COUTT__ <<
"sendAll: resent packet " << missingIdx << std::endl;
277 if(interPacketGapUSeconds > 0)
278 usleep(interPacketGapUSeconds);
282 __COUT_WARN__ <<
"sendAll: retransmit request for invalid packet index "
283 << missingIdx <<
" (total=" << totalPackets <<
")" << __E__;
297 const std::string& sendBuffer,
298 unsigned int timeoutSeconds ,
299 unsigned int timeoutUSeconds ,
301 unsigned int interPacketTimeoutUSeconds )
303 using clock = std::chrono::steady_clock;
304 auto start = clock::now();
307 std::lock_guard<std::mutex> lock(
308 sendAndReceiveMutex_);
311 send(toSocket, sendBuffer, verbose);
313 __COUTT__ <<
" ----> Time sendAndReceive '" << sendBuffer
314 <<
"' (socketNumber=" << socketNumber_ <<
") check ==> "
315 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
318 <<
" milliseconds. PID=" << getpid()
319 <<
" TID=" << std::this_thread::get_id() << std::endl;
321 std::string receiveBuffer;
322 if(
receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
324 __SS__ <<
"Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
325 <<
" s) or Error receiving response buffer from remote ip:port "
326 << toSocket.getIPAddress() <<
":" << toSocket.getPort()
327 <<
" to this ip:port " << Socket::getIPAddress() <<
":"
328 << Socket::getPort() << __E__;
332 __COUTT__ <<
" ----> Time sendAndReceive '" << sendBuffer <<
"' got "
333 << receiveBuffer.size() <<
" (socketNumber=" << socketNumber_
335 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
338 <<
" milliseconds. PID=" << getpid()
339 <<
" TID=" << std::this_thread::get_id() << std::endl;
342 std::string receiveBuffer2;
345 (timeoutSeconds == 0 && timeoutUSeconds < interPacketTimeoutUSeconds)
347 : interPacketTimeoutUSeconds,
350 receiveBuffer += receiveBuffer2;
352 __COUTT__ <<
" ----> Time sendAndReceive +" << receiveBuffer2.size()
354 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
357 <<
" milliseconds." << std::endl;
360 __COUTT__ <<
" ----> Time sendAndReceive " << receiveBuffer.size() <<
" check ==> "
361 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
364 <<
" milliseconds." << std::endl;
366 return receiveBuffer;
385 unsigned int timeoutSeconds ,
386 unsigned int retransmitMaxRetries ,
389 using clock = std::chrono::steady_clock;
390 auto start = clock::now();
393 std::map<uint16_t, std::string> receivedPackets;
394 uint16_t totalPackets = 0;
395 bool totalKnown =
false;
398 __COUT__ <<
"receiveAll: waiting for retransmission-mode packets, timeout="
399 << timeoutSeconds <<
"s" << __E__;
404 const unsigned int interPacketTimeoutUSeconds = 100000;
405 bool firstPacketReceived =
false;
409 std::string rawPacket;
411 firstPacketReceived ? 0 : timeoutSeconds,
412 firstPacketReceived ? interPacketTimeoutUSeconds : 0,
417 if(!firstPacketReceived)
421 __COUT__ <<
"receiveAll: timeout waiting for first packet after "
422 << timeoutSeconds <<
"s" << __E__;
430 if(rawPacket.size() < RETRANSMIT_HEADER_SIZE)
434 if(!firstPacketReceived)
441 __COUT_WARN__ <<
"receiveAll: skipping undersized packet ("
442 << rawPacket.size() <<
" bytes)" << __E__;
447 uint16_t magic, packetIndex, pktTotal, payloadSize;
448 std::memcpy(&magic, rawPacket.data() + 0, 2);
449 std::memcpy(&packetIndex, rawPacket.data() + 2, 2);
450 std::memcpy(&pktTotal, rawPacket.data() + 4, 2);
451 std::memcpy(&payloadSize, rawPacket.data() + 6, 2);
452 magic = ntohs(magic);
453 packetIndex = ntohs(packetIndex);
454 pktTotal = ntohs(pktTotal);
455 payloadSize = ntohs(payloadSize);
460 if(!firstPacketReceived)
466 __COUT_WARN__ <<
"receiveAll: skipping packet with bad magic 0x"
467 << std::hex << magic << std::dec << __E__;
471 firstPacketReceived =
true;
472 totalPackets = pktTotal;
476 size_t actualPayload = rawPacket.size() - RETRANSMIT_HEADER_SIZE;
477 if(actualPayload > payloadSize)
478 actualPayload = payloadSize;
480 receivedPackets[packetIndex] =
481 rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload);
484 __COUTT__ <<
"receiveAll: received packet " << packetIndex <<
"/"
485 << totalPackets <<
" payload=" << actualPayload
486 <<
" total_received=" << receivedPackets.size() << std::endl;
489 if(totalKnown && receivedPackets.size() >=
static_cast<size_t>(totalPackets))
494 std::chrono::duration_cast<std::chrono::seconds>(clock::now() - start);
495 if(elapsed.count() >=
496 static_cast<long>(timeoutSeconds * (retransmitMaxRetries + 1)))
499 __COUT_WARN__ <<
"receiveAll: overall timeout reached" << __E__;
505 if(totalKnown && receivedPackets.size() <
static_cast<size_t>(totalPackets))
507 for(
unsigned int retry = 0; retry < retransmitMaxRetries; ++retry)
510 std::set<uint16_t> missing;
511 for(uint16_t i = 0; i < totalPackets; ++i)
513 if(receivedPackets.find(i) == receivedPackets.end())
521 __COUT__ <<
"receiveAll: retry " << retry + 1 <<
"/"
522 << retransmitMaxRetries <<
", requesting retransmit of "
523 << missing.size() <<
" packets" << __E__;
526 std::string retransmitReq;
527 retransmitReq.resize(2 + missing.size() * 2);
529 std::memcpy(&retransmitReq[0], &netMagic, 2);
531 for(uint16_t idx : missing)
533 uint16_t netIdx = htons(idx);
534 std::memcpy(&retransmitReq[pos], &netIdx, 2);
541 int sendToSize = sendto(socketNumber_,
542 retransmitReq.data(),
543 retransmitReq.size(),
545 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
546 sizeof(sockaddr_in));
549 __COUT_WARN__ <<
"receiveAll: failed to send retransmit request: "
550 << strerror(errno) << __E__;
557 std::string rawPacket;
559 rawPacket, timeoutSeconds, 0 ,
false );
563 if(rawPacket.size() < RETRANSMIT_HEADER_SIZE)
566 uint16_t magic2, packetIndex2, pktTotal2, payloadSize2;
567 std::memcpy(&magic2, rawPacket.data() + 0, 2);
568 std::memcpy(&packetIndex2, rawPacket.data() + 2, 2);
569 std::memcpy(&pktTotal2, rawPacket.data() + 4, 2);
570 std::memcpy(&payloadSize2, rawPacket.data() + 6, 2);
571 magic2 = ntohs(magic2);
572 packetIndex2 = ntohs(packetIndex2);
573 pktTotal2 = ntohs(pktTotal2);
574 payloadSize2 = ntohs(payloadSize2);
579 size_t actualPayload2 = rawPacket.size() - RETRANSMIT_HEADER_SIZE;
580 if(actualPayload2 > payloadSize2)
581 actualPayload2 = payloadSize2;
583 receivedPackets[packetIndex2] =
584 rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload2);
587 __COUTT__ <<
"receiveAll: retransmit received packet " << packetIndex2
588 <<
"/" << totalPackets
589 <<
" total_received=" << receivedPackets.size()
593 if(receivedPackets.size() >=
static_cast<size_t>(totalPackets))
597 if(receivedPackets.size() >=
static_cast<size_t>(totalPackets))
604 std::string doneSignal(4,
'\0');
606 uint16_t netDone = htons(0xFFFF);
607 std::memcpy(&doneSignal[0], &netMagic, 2);
608 std::memcpy(&doneSignal[2], &netDone, 2);
609 sendto(socketNumber_,
613 (
struct sockaddr*)&(ReceiverSocket::fromAddress_),
614 sizeof(sockaddr_in));
618 if(!totalKnown || receivedPackets.empty())
620 __SS__ <<
"receiveAll: failed to receive any retransmission-mode packets"
625 if(receivedPackets.size() <
static_cast<size_t>(totalPackets))
628 std::string missingStr;
629 for(uint16_t i = 0; i < totalPackets; ++i)
631 if(receivedPackets.find(i) == receivedPackets.end())
633 if(!missingStr.empty())
635 missingStr += std::to_string(i);
638 __SS__ <<
"receiveAll: failed to receive all packets after "
639 << retransmitMaxRetries <<
" retransmit retries. "
640 <<
"Received " << receivedPackets.size() <<
"/" << totalPackets
641 <<
" packets. Missing indices: [" << missingStr <<
"]" << __E__;
647 for(uint16_t i = 0; i < totalPackets; ++i)
648 buffer += receivedPackets[i];
651 __COUT__ <<
"receiveAll: successfully assembled " << buffer.size()
652 <<
" bytes from " << totalPackets <<
" packets in "
653 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
676 const std::string& sendBuffer,
677 unsigned int timeoutSeconds ,
678 unsigned int retransmitMaxRetries ,
681 using clock = std::chrono::steady_clock;
682 auto start = clock::now();
685 std::lock_guard<std::mutex> lock(sendAndReceiveMutex_);
688 send(toSocket, sendBuffer, verbose);
690 __COUTT__ <<
" ----> Time sendAndReceiveAll '" << sendBuffer
691 <<
"' (socketNumber=" << socketNumber_ <<
") check ==> "
692 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
695 <<
" milliseconds. PID=" << getpid()
696 <<
" TID=" << std::this_thread::get_id() << std::endl;
698 std::string receiveBuffer;
699 if(
receiveAll(receiveBuffer, timeoutSeconds, retransmitMaxRetries, verbose) < 0)
701 __SS__ <<
"Timeout (" << timeoutSeconds
702 <<
" s) or Error receiving retransmission response from remote ip:port "
703 << toSocket.getIPAddress() <<
":" << toSocket.getPort()
704 <<
" to this ip:port " << Socket::getIPAddress() <<
":"
705 << Socket::getPort() << __E__;
709 __COUTT__ <<
" ----> Time sendAndReceiveAll complete: " << receiveBuffer.size()
710 <<
" bytes (socketNumber=" << socketNumber_ <<
") ==> "
711 << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
714 <<
" milliseconds. PID=" << getpid()
715 <<
" TID=" << std::this_thread::get_id() << std::endl;
717 return receiveBuffer;
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
int acknowledge(const std::string &buffer, bool verbose=false, size_t maxChunkSize=1500, unsigned int interPacketGapUSeconds=0, bool enableRetransmission=false)
std::string sendAndReceiveAll(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=5, unsigned int retransmitMaxRetries=10, bool verbose=false)
static constexpr uint16_t RETRANSMIT_MAGIC
Retransmission protocol constants.
int sendAll(const std::string &buffer, bool verbose=false, size_t maxChunkSize=65500, unsigned int interPacketGapUSeconds=0)
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false, unsigned int interPacketTimeoutUSeconds=10000)
int receiveAll(std::string &buffer, unsigned int timeoutSeconds=5, unsigned int retransmitMaxRetries=10, bool verbose=false)
defines used also by OtsConfigurationWizardSupervisor