otsdaq  3.09.00
TransceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <arpa/inet.h>
6 #include <unistd.h>
7 #include <chrono>
8 #include <cstring>
9 #include <iostream>
10 #include <limits>
11 #include <map>
12 #include <mutex>
13 #include <set>
14 #include <thread>
15 #include <vector>
16 
17 using namespace ots;
18 
19 //==============================================================================
20 TransceiverSocket::TransceiverSocket(void)
21 {
22  __COUT__ << "TransceiverSocket constructor " << __E__;
23 }
24 
25 //==============================================================================
26 TransceiverSocket::TransceiverSocket(std::string IPAddress, unsigned int port)
27  : Socket(IPAddress, port)
28 {
29  __COUT__ << "TransceiverSocket constructor " << IPAddress << ":" << port << __E__;
30 }
31 
32 //==============================================================================
33 TransceiverSocket::~TransceiverSocket(void) {}
34 
35 //==============================================================================
39 int TransceiverSocket::acknowledge(const std::string& buffer,
40  bool verbose /* = false */,
41  size_t maxChunkSize /* = 1500 */,
42  unsigned int interPacketGapUSeconds /* = 0 */,
43  bool enableRetransmission /* = false */)
44 {
45  if(verbose)
46  __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_
47  << " from-port: " << ntohs(socketAddress_.sin_port)
48  << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
49  << " retransmission: " << (enableRetransmission ? "ON" : "OFF")
50  << std::endl;
51 
52  if(!enableRetransmission)
53  {
54  //====================================================================
55  // Original non-retransmission mode (unchanged behavior)
56  //====================================================================
57  // lockout other senders for the remainder of this scope
58  std::lock_guard<std::mutex> lock(sendMutex_);
59 
60  const size_t MAX_SEND_SIZE =
61  maxChunkSize > 65500u ? static_cast<size_t>(65500u) : maxChunkSize;
62  size_t offset = 0;
63  int sendToSize = 1;
64  int sizeInBytes = 1;
65 
66  while(offset < buffer.size() && sendToSize > 0)
67  {
68  auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
69  ? MAX_SEND_SIZE
70  : sizeInBytes * (buffer.size() - offset);
71  if(verbose)
72  __COUTTV__(thisSize);
73  sendToSize = sendto(socketNumber_,
74  &buffer[0] + offset,
75  thisSize,
76  0,
77  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
78  sizeof(sockaddr_in));
79  offset += sendToSize / sizeInBytes;
80  if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0)
81  usleep(interPacketGapUSeconds);
82  }
83 
84  if(sendToSize <= 0)
85  {
86  __SS__ << "Error writing buffer from port "
87  << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": "
88  << strerror(errno) << std::endl;
89  __SS_THROW__;
90  }
91  return 0;
92  }
93 
94  //====================================================================
95  // Retransmission mode: delegate entirely to sendAll() which handles
96  // packet building, initial send, and retransmit request handling.
97  //====================================================================
98  return sendAll(buffer, verbose, maxChunkSize, interPacketGapUSeconds);
99 } //end acknowledge()
100 
101 //==============================================================================
111 int TransceiverSocket::sendAll(const std::string& buffer,
112  bool verbose /* = false */,
113  size_t maxChunkSize /* = 65500 */,
114  unsigned int interPacketGapUSeconds /* = 0 */)
115 {
116  if(verbose)
117  __COUT__ << "sendAll: retransmission-mode send on Socket Descriptor #: "
118  << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port)
119  << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
120  << " buffer size: " << buffer.size() << __E__;
121 
122  const size_t MAX_SEND_SIZE =
123  maxChunkSize > 65500u ? static_cast<size_t>(65500u) : maxChunkSize;
124 
125  // The payload per packet is reduced by the header size
126  const size_t payloadMax = MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE
127  ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE
128  : 1;
129 
130  // Calculate total number of packets. The on-wire header carries the count
131  // as uint16_t, so reject buffers that would require more than 65535 packets.
132  const size_t packetsNeeded = (buffer.size() + payloadMax - 1) / payloadMax;
133  if(packetsNeeded > std::numeric_limits<uint16_t>::max())
134  {
135  __SS__ << "sendAll: buffer size " << buffer.size() << " requires "
136  << packetsNeeded
137  << " packets, which exceeds the uint16_t protocol limit of "
138  << std::numeric_limits<uint16_t>::max() << " (payloadMax=" << payloadMax
139  << ")." << std::endl;
140  __SS_THROW__;
141  }
142  uint16_t totalPackets = static_cast<uint16_t>(packetsNeeded);
143  if(totalPackets == 0)
144  totalPackets = 1; // send at least one packet even for empty buffer
145 
146  if(verbose)
147  __COUT__ << "sendAll: sending " << totalPackets << " packets for "
148  << buffer.size() << " bytes, payloadMax=" << payloadMax << __E__;
149 
150  // Build and cache all packets (header + payload) for retransmit use
151  std::vector<std::string> packets(totalPackets);
152  {
153  size_t offset = 0;
154  for(uint16_t pi = 0; pi < totalPackets; ++pi)
155  {
156  size_t payloadSize = (buffer.size() - offset) > payloadMax
157  ? payloadMax
158  : (buffer.size() - offset);
159 
160  char header[RETRANSMIT_HEADER_SIZE];
161  uint16_t netMagic = htons(RETRANSMIT_MAGIC);
162  uint16_t netIndex = htons(pi);
163  uint16_t netTotal = htons(totalPackets);
164  uint16_t netPaySize = htons(static_cast<uint16_t>(payloadSize));
165  std::memcpy(header + 0, &netMagic, 2);
166  std::memcpy(header + 2, &netIndex, 2);
167  std::memcpy(header + 4, &netTotal, 2);
168  std::memcpy(header + 6, &netPaySize, 2);
169 
170  packets[pi].assign(header, RETRANSMIT_HEADER_SIZE);
171  packets[pi].append(buffer, offset, payloadSize);
172  offset += payloadSize;
173  }
174  }
175 
176  // Send all packets initially (lock sendMutex_ for the burst)
177  {
178  std::lock_guard<std::mutex> lock(sendMutex_);
179  for(uint16_t pi = 0; pi < totalPackets; ++pi)
180  {
181  int sendToSize = sendto(socketNumber_,
182  packets[pi].data(),
183  packets[pi].size(),
184  0,
185  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
186  sizeof(sockaddr_in));
187  if(sendToSize <= 0)
188  {
189  __SS__ << "sendAll: error writing packet " << pi << "/" << totalPackets
190  << " from port " << ntohs(socketAddress_.sin_port) << ": "
191  << strerror(errno) << std::endl;
192  __SS_THROW__;
193  }
194  if(verbose)
195  __COUTT__ << "sendAll: sent packet " << pi << "/" << totalPackets
196  << " size=" << packets[pi].size() << std::endl;
197 
198  if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets)
199  usleep(interPacketGapUSeconds);
200  }
201  }
202 
203  // Wait for retransmit requests from receiver.
204  // Retransmit request format: magic(2 bytes) + list of uint16 missing indices
205  // Done signal format: magic(2 bytes) + 0xFFFF(2 bytes)
206  const unsigned int retransmitTimeoutSeconds = 5;
207  const unsigned int maxRetransmitRounds = 20;
208 
209  for(unsigned int round = 0; round < maxRetransmitRounds; ++round)
210  {
211  std::string retransmitRequest;
212  int rc = receive(retransmitRequest,
213  retransmitTimeoutSeconds,
214  0 /*timeoutUSeconds*/,
215  false /*verbose*/);
216  if(rc < 0)
217  {
218  // Timeout - assume receiver got everything (or gave up)
219  if(verbose)
220  __COUT__ << "sendAll: no retransmit request after "
221  << retransmitTimeoutSeconds
222  << "s timeout, assuming transfer complete." << __E__;
223  break;
224  }
225 
226  if(retransmitRequest.size() < 4)
227  continue;
228 
229  uint16_t reqMagic;
230  std::memcpy(&reqMagic, retransmitRequest.data(), 2);
231  reqMagic = ntohs(reqMagic);
232  if(reqMagic != RETRANSMIT_MAGIC)
233  continue;
234 
235  // Check for "done" signal (magic + 0xFFFF)
236  uint16_t firstVal;
237  std::memcpy(&firstVal, retransmitRequest.data() + 2, 2);
238  firstVal = ntohs(firstVal);
239  if(firstVal == 0xFFFF)
240  {
241  if(verbose)
242  __COUT__ << "sendAll: received 'all done' from receiver." << __E__;
243  break;
244  }
245 
246  // Parse list of missing packet indices and resend them
247  size_t numIndices = (retransmitRequest.size() - 2) / 2;
248  if(verbose)
249  __COUT__ << "sendAll: retransmit request for " << numIndices
250  << " packets (round " << round << ")." << __E__;
251 
252  // Lock sendMutex_ for the resend burst
253  std::lock_guard<std::mutex> lock(sendMutex_);
254  for(size_t i = 0; i < numIndices; ++i)
255  {
256  uint16_t missingIdx;
257  std::memcpy(&missingIdx, retransmitRequest.data() + 2 + i * 2, 2);
258  missingIdx = ntohs(missingIdx);
259 
260  if(missingIdx < totalPackets)
261  {
262  int sendToSize = sendto(socketNumber_,
263  packets[missingIdx].data(),
264  packets[missingIdx].size(),
265  0,
266  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
267  sizeof(sockaddr_in));
268  if(sendToSize <= 0)
269  {
270  __SS__ << "sendAll: error resending packet " << missingIdx << ": "
271  << strerror(errno) << std::endl;
272  __SS_THROW__;
273  }
274  if(verbose)
275  __COUTT__ << "sendAll: resent packet " << missingIdx << std::endl;
276 
277  if(interPacketGapUSeconds > 0)
278  usleep(interPacketGapUSeconds);
279  }
280  else
281  {
282  __COUT_WARN__ << "sendAll: retransmit request for invalid packet index "
283  << missingIdx << " (total=" << totalPackets << ")" << __E__;
284  }
285  }
286  }
287 
288  return 0;
289 } //end sendAll()
290 
291 //==============================================================================
296  Socket& toSocket,
297  const std::string& sendBuffer,
298  unsigned int timeoutSeconds /* = 1 */,
299  unsigned int timeoutUSeconds /* = 0 */,
300  bool verbose /* = false */,
301  unsigned int interPacketTimeoutUSeconds /* = 10000 */)
302 {
303  using clock = std::chrono::steady_clock;
304  auto start = clock::now();
305 
306  // lockout other sender and receive attempts for the remainder of the scope
307  std::lock_guard<std::mutex> lock(
308  sendAndReceiveMutex_); //note that TransmitterSocket::sendMutex_ is not enough
309 
310  flush(); //make sure nothing to read before sending
311  send(toSocket, sendBuffer, verbose);
312 
313  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer
314  << "' (socketNumber=" << socketNumber_ << ") check ==> "
315  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
316  start)
317  .count()
318  << " milliseconds. PID=" << getpid()
319  << " TID=" << std::this_thread::get_id() << std::endl;
320 
321  std::string receiveBuffer;
322  if(receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
323  {
324  __SS__ << "Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
325  << " s) or Error receiving response buffer from remote ip:port "
326  << toSocket.getIPAddress() << ":" << toSocket.getPort()
327  << " to this ip:port " << Socket::getIPAddress() << ":"
328  << Socket::getPort() << __E__;
329  __SS_ONLY_THROW__;
330  }
331 
332  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer << "' got "
333  << receiveBuffer.size() << " (socketNumber=" << socketNumber_
334  << ") check ==> "
335  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
336  start)
337  .count()
338  << " milliseconds. PID=" << getpid()
339  << " TID=" << std::this_thread::get_id() << std::endl;
340 
341  //assume response may be multiple packets! (and give interPacketTimeoutUSeconds unless called with lower timeout)
342  std::string receiveBuffer2;
343  while(receive(receiveBuffer2,
344  0 /*timeoutSeconds*/,
345  (timeoutSeconds == 0 && timeoutUSeconds < interPacketTimeoutUSeconds)
346  ? timeoutUSeconds
347  : interPacketTimeoutUSeconds,
348  verbose) >= 0)
349  {
350  receiveBuffer += receiveBuffer2; //append
351 
352  __COUTT__ << " ----> Time sendAndReceive +" << receiveBuffer2.size()
353  << " check ==> "
354  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
355  start)
356  .count()
357  << " milliseconds." << std::endl;
358  }
359 
360  __COUTT__ << " ----> Time sendAndReceive " << receiveBuffer.size() << " check ==> "
361  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
362  start)
363  .count()
364  << " milliseconds." << std::endl;
365 
366  return receiveBuffer;
367 } //end sendAndReceive()
368 
369 //==============================================================================
384 int TransceiverSocket::receiveAll(std::string& buffer,
385  unsigned int timeoutSeconds /* = 5 */,
386  unsigned int retransmitMaxRetries /* = 10 */,
387  bool verbose /* = false */)
388 {
389  using clock = std::chrono::steady_clock;
390  auto start = clock::now();
391 
392  // Map of packet index -> payload data
393  std::map<uint16_t, std::string> receivedPackets;
394  uint16_t totalPackets = 0;
395  bool totalKnown = false;
396 
397  if(verbose)
398  __COUT__ << "receiveAll: waiting for retransmission-mode packets, timeout="
399  << timeoutSeconds << "s" << __E__;
400 
401  // Phase 1: Receive all initial packets until timeout
402  // Use a per-packet timeout that is shorter than the overall timeout,
403  // so we can detect "no more packets arriving" vs "still waiting for first"
404  const unsigned int interPacketTimeoutUSeconds = 100000; // 100ms between packets
405  bool firstPacketReceived = false;
406 
407  while(true)
408  {
409  std::string rawPacket;
410  int rc = receive(rawPacket,
411  firstPacketReceived ? 0 : timeoutSeconds,
412  firstPacketReceived ? interPacketTimeoutUSeconds : 0,
413  false /*verbose*/);
414 
415  if(rc < 0)
416  {
417  if(!firstPacketReceived)
418  {
419  // Never received any packet at all
420  if(verbose)
421  __COUT__ << "receiveAll: timeout waiting for first packet after "
422  << timeoutSeconds << "s" << __E__;
423  return -1;
424  }
425  // Timeout between packets - move to retransmit phase
426  break;
427  }
428 
429  // Check for retransmission header
430  if(rawPacket.size() < RETRANSMIT_HEADER_SIZE)
431  {
432  // Too small to be a retransmission packet - might be a non-retransmit
433  // response; just return it as-is
434  if(!firstPacketReceived)
435  {
436  buffer = rawPacket;
437  return 0;
438  }
439  // Skip malformed packet during multi-packet receive
440  if(verbose)
441  __COUT_WARN__ << "receiveAll: skipping undersized packet ("
442  << rawPacket.size() << " bytes)" << __E__;
443  continue;
444  }
445 
446  // Parse header
447  uint16_t magic, packetIndex, pktTotal, payloadSize;
448  std::memcpy(&magic, rawPacket.data() + 0, 2);
449  std::memcpy(&packetIndex, rawPacket.data() + 2, 2);
450  std::memcpy(&pktTotal, rawPacket.data() + 4, 2);
451  std::memcpy(&payloadSize, rawPacket.data() + 6, 2);
452  magic = ntohs(magic);
453  packetIndex = ntohs(packetIndex);
454  pktTotal = ntohs(pktTotal);
455  payloadSize = ntohs(payloadSize);
456 
457  if(magic != RETRANSMIT_MAGIC)
458  {
459  // Not a retransmission packet - if first packet, return as-is
460  if(!firstPacketReceived)
461  {
462  buffer = rawPacket;
463  return 0;
464  }
465  if(verbose)
466  __COUT_WARN__ << "receiveAll: skipping packet with bad magic 0x"
467  << std::hex << magic << std::dec << __E__;
468  continue;
469  }
470 
471  firstPacketReceived = true;
472  totalPackets = pktTotal;
473  totalKnown = true;
474 
475  // Extract payload (everything after the 8-byte header, limited by payloadSize)
476  size_t actualPayload = rawPacket.size() - RETRANSMIT_HEADER_SIZE;
477  if(actualPayload > payloadSize)
478  actualPayload = payloadSize;
479 
480  receivedPackets[packetIndex] =
481  rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload);
482 
483  if(verbose)
484  __COUTT__ << "receiveAll: received packet " << packetIndex << "/"
485  << totalPackets << " payload=" << actualPayload
486  << " total_received=" << receivedPackets.size() << std::endl;
487 
488  // Check if we have all packets
489  if(totalKnown && receivedPackets.size() >= static_cast<size_t>(totalPackets))
490  break;
491 
492  // Check overall timeout
493  auto elapsed =
494  std::chrono::duration_cast<std::chrono::seconds>(clock::now() - start);
495  if(elapsed.count() >=
496  static_cast<long>(timeoutSeconds * (retransmitMaxRetries + 1)))
497  {
498  if(verbose)
499  __COUT_WARN__ << "receiveAll: overall timeout reached" << __E__;
500  break;
501  }
502  }
503 
504  // Phase 2: Retransmit missing packets
505  if(totalKnown && receivedPackets.size() < static_cast<size_t>(totalPackets))
506  {
507  for(unsigned int retry = 0; retry < retransmitMaxRetries; ++retry)
508  {
509  // Build list of missing packet indices
510  std::set<uint16_t> missing;
511  for(uint16_t i = 0; i < totalPackets; ++i)
512  {
513  if(receivedPackets.find(i) == receivedPackets.end())
514  missing.insert(i);
515  }
516 
517  if(missing.empty())
518  break;
519 
520  if(verbose)
521  __COUT__ << "receiveAll: retry " << retry + 1 << "/"
522  << retransmitMaxRetries << ", requesting retransmit of "
523  << missing.size() << " packets" << __E__;
524 
525  // Build retransmit request: magic(2 bytes) + list of uint16 indices
526  std::string retransmitReq;
527  retransmitReq.resize(2 + missing.size() * 2);
528  uint16_t netMagic = htons(RETRANSMIT_MAGIC);
529  std::memcpy(&retransmitReq[0], &netMagic, 2);
530  size_t pos = 2;
531  for(uint16_t idx : missing)
532  {
533  uint16_t netIdx = htons(idx);
534  std::memcpy(&retransmitReq[pos], &netIdx, 2);
535  pos += 2;
536  }
537 
538  // Send retransmit request back to sender (acknowledge to last receive addr)
539  {
540  // Use sendto directly to fromAddress_ (the sender)
541  int sendToSize = sendto(socketNumber_,
542  retransmitReq.data(),
543  retransmitReq.size(),
544  0,
545  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
546  sizeof(sockaddr_in));
547  if(sendToSize <= 0)
548  {
549  __COUT_WARN__ << "receiveAll: failed to send retransmit request: "
550  << strerror(errno) << __E__;
551  }
552  }
553 
554  // Receive retransmitted packets
555  while(true)
556  {
557  std::string rawPacket;
558  int rc = receive(
559  rawPacket, timeoutSeconds, 0 /*timeoutUSeconds*/, false /*verbose*/);
560  if(rc < 0)
561  break; // timeout, will retry
562 
563  if(rawPacket.size() < RETRANSMIT_HEADER_SIZE)
564  continue;
565 
566  uint16_t magic2, packetIndex2, pktTotal2, payloadSize2;
567  std::memcpy(&magic2, rawPacket.data() + 0, 2);
568  std::memcpy(&packetIndex2, rawPacket.data() + 2, 2);
569  std::memcpy(&pktTotal2, rawPacket.data() + 4, 2);
570  std::memcpy(&payloadSize2, rawPacket.data() + 6, 2);
571  magic2 = ntohs(magic2);
572  packetIndex2 = ntohs(packetIndex2);
573  pktTotal2 = ntohs(pktTotal2);
574  payloadSize2 = ntohs(payloadSize2);
575 
576  if(magic2 != RETRANSMIT_MAGIC)
577  continue;
578 
579  size_t actualPayload2 = rawPacket.size() - RETRANSMIT_HEADER_SIZE;
580  if(actualPayload2 > payloadSize2)
581  actualPayload2 = payloadSize2;
582 
583  receivedPackets[packetIndex2] =
584  rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload2);
585 
586  if(verbose)
587  __COUTT__ << "receiveAll: retransmit received packet " << packetIndex2
588  << "/" << totalPackets
589  << " total_received=" << receivedPackets.size()
590  << std::endl;
591 
592  // Check if we now have all packets
593  if(receivedPackets.size() >= static_cast<size_t>(totalPackets))
594  break;
595  }
596 
597  if(receivedPackets.size() >= static_cast<size_t>(totalPackets))
598  break;
599  }
600  }
601 
602  // Phase 3: Send "done" acknowledgment to sender (magic + 0xFFFF)
603  {
604  std::string doneSignal(4, '\0');
605  uint16_t netMagic = htons(RETRANSMIT_MAGIC);
606  uint16_t netDone = htons(0xFFFF);
607  std::memcpy(&doneSignal[0], &netMagic, 2);
608  std::memcpy(&doneSignal[2], &netDone, 2);
609  sendto(socketNumber_,
610  doneSignal.data(),
611  doneSignal.size(),
612  0,
613  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
614  sizeof(sockaddr_in));
615  }
616 
617  // Phase 4: Assemble full buffer in order
618  if(!totalKnown || receivedPackets.empty())
619  {
620  __SS__ << "receiveAll: failed to receive any retransmission-mode packets"
621  << __E__;
622  __SS_THROW__;
623  }
624 
625  if(receivedPackets.size() < static_cast<size_t>(totalPackets))
626  {
627  // Build list of still-missing indices for the error message
628  std::string missingStr;
629  for(uint16_t i = 0; i < totalPackets; ++i)
630  {
631  if(receivedPackets.find(i) == receivedPackets.end())
632  {
633  if(!missingStr.empty())
634  missingStr += ", ";
635  missingStr += std::to_string(i);
636  }
637  }
638  __SS__ << "receiveAll: failed to receive all packets after "
639  << retransmitMaxRetries << " retransmit retries. "
640  << "Received " << receivedPackets.size() << "/" << totalPackets
641  << " packets. Missing indices: [" << missingStr << "]" << __E__;
642  __SS_THROW__;
643  }
644 
645  // Assemble in order
646  buffer.clear();
647  for(uint16_t i = 0; i < totalPackets; ++i)
648  buffer += receivedPackets[i];
649 
650  if(verbose)
651  __COUT__ << "receiveAll: successfully assembled " << buffer.size()
652  << " bytes from " << totalPackets << " packets in "
653  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
654  start)
655  .count()
656  << " ms" << __E__;
657 
658  return 0;
659 } //end receiveAll()
660 
661 //==============================================================================
675  Socket& toSocket,
676  const std::string& sendBuffer,
677  unsigned int timeoutSeconds /* = 5 */,
678  unsigned int retransmitMaxRetries /* = 10 */,
679  bool verbose /* = false */)
680 {
681  using clock = std::chrono::steady_clock;
682  auto start = clock::now();
683 
684  // lockout other sender and receive attempts for the remainder of the scope
685  std::lock_guard<std::mutex> lock(sendAndReceiveMutex_);
686 
687  flush(); // make sure nothing to read before sending
688  send(toSocket, sendBuffer, verbose);
689 
690  __COUTT__ << " ----> Time sendAndReceiveAll '" << sendBuffer
691  << "' (socketNumber=" << socketNumber_ << ") check ==> "
692  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
693  start)
694  .count()
695  << " milliseconds. PID=" << getpid()
696  << " TID=" << std::this_thread::get_id() << std::endl;
697 
698  std::string receiveBuffer;
699  if(receiveAll(receiveBuffer, timeoutSeconds, retransmitMaxRetries, verbose) < 0)
700  {
701  __SS__ << "Timeout (" << timeoutSeconds
702  << " s) or Error receiving retransmission response from remote ip:port "
703  << toSocket.getIPAddress() << ":" << toSocket.getPort()
704  << " to this ip:port " << Socket::getIPAddress() << ":"
705  << Socket::getPort() << __E__;
706  __SS_ONLY_THROW__;
707  }
708 
709  __COUTT__ << " ----> Time sendAndReceiveAll complete: " << receiveBuffer.size()
710  << " bytes (socketNumber=" << socketNumber_ << ") ==> "
711  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
712  start)
713  .count()
714  << " milliseconds. PID=" << getpid()
715  << " TID=" << std::this_thread::get_id() << std::endl;
716 
717  return receiveBuffer;
718 } //end sendAndReceiveAll()
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
int acknowledge(const std::string &buffer, bool verbose=false, size_t maxChunkSize=1500, unsigned int interPacketGapUSeconds=0, bool enableRetransmission=false)
std::string sendAndReceiveAll(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=5, unsigned int retransmitMaxRetries=10, bool verbose=false)
static constexpr uint16_t RETRANSMIT_MAGIC
Retransmission protocol constants.
int sendAll(const std::string &buffer, bool verbose=false, size_t maxChunkSize=65500, unsigned int interPacketGapUSeconds=0)
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false, unsigned int interPacketTimeoutUSeconds=10000)
int receiveAll(std::string &buffer, unsigned int timeoutSeconds=5, unsigned int retransmitMaxRetries=10, bool verbose=false)
defines used also by OtsConfigurationWizardSupervisor