otsdaq  3.07.00
TransceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <unistd.h>
6 #include <chrono>
7 #include <iostream>
8 #include <mutex>
9 #include <thread>
10 
11 using namespace ots;
12 
13 //==============================================================================
14 TransceiverSocket::TransceiverSocket(void)
15 {
16  __COUT__ << "TransceiverSocket constructor " << __E__;
17 }
18 
19 //==============================================================================
20 TransceiverSocket::TransceiverSocket(std::string IPAddress, unsigned int port)
21  : Socket(IPAddress, port)
22 {
23  __COUT__ << "TransceiverSocket constructor " << IPAddress << ":" << port << __E__;
24 }
25 
26 //==============================================================================
27 TransceiverSocket::~TransceiverSocket(void) {}
28 
29 //==============================================================================
31 int TransceiverSocket::acknowledge(const std::string& buffer,
32  bool verbose /* = false */,
33  size_t maxChunkSize /* = 1500 */,
34  unsigned int interPacketGapUSeconds /* = 0 */)
35 {
36  // lockout other senders for the remainder of the scope
37  std::lock_guard<std::mutex> lock(sendMutex_);
38 
39  if(verbose)
40  __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_
41  << " from-port: " << ntohs(socketAddress_.sin_port)
42  << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
43  << std::endl;
44 
45  const size_t MAX_SEND_SIZE =
46  maxChunkSize > 65500u ? static_cast<size_t>(65500u) : maxChunkSize;
47  size_t offset = 0;
48  int sendToSize = 1;
49 
50  int sizeInBytes = 1;
51 
52  while(offset < buffer.size() && sendToSize > 0)
53  {
54  auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
55  ? MAX_SEND_SIZE
56  : sizeInBytes * (buffer.size() - offset);
57  if(verbose)
58  __COUTTV__(thisSize);
59  sendToSize = sendto(socketNumber_,
60  &buffer[0] + offset,
61  thisSize,
62  0,
63  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
64  sizeof(sockaddr_in));
65  offset += sendToSize / sizeInBytes;
66  if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0)
67  usleep(interPacketGapUSeconds);
68  }
69 
70  if(sendToSize <= 0)
71  {
72  __SS__ << "Error writing buffer from port "
73  << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": "
74  << strerror(errno) << std::endl;
75  __SS_THROW__; //return -1;
76  }
77 
78  return 0;
79 } //end acknowledge()
80 
81 //==============================================================================
86  Socket& toSocket,
87  const std::string& sendBuffer,
88  unsigned int timeoutSeconds /* = 1 */,
89  unsigned int timeoutUSeconds /* = 0 */,
90  bool verbose /* = false */,
91  unsigned int interPacketTimeoutUSeconds /* = 10000 */)
92 {
93  using clock = std::chrono::steady_clock;
94  auto start = clock::now();
95 
96  // lockout other sender and receive attempts for the remainder of the scope
97  std::lock_guard<std::mutex> lock(
98  sendAndReceiveMutex_); //note that TransmitterSocket::sendMutex_ is not enough
99 
100  flush(); //make sure nothing to read before sending
101  send(toSocket, sendBuffer, verbose);
102 
103  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer
104  << "' (socketNumber=" << socketNumber_ << ") check ==> "
105  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
106  start)
107  .count()
108  << " milliseconds. PID=" << getpid()
109  << " TID=" << std::this_thread::get_id() << std::endl;
110 
111  std::string receiveBuffer;
112  if(receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
113  {
114  __SS__ << "Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
115  << " s) or Error receiving response buffer from remote ip:port "
116  << toSocket.getIPAddress() << ":" << toSocket.getPort()
117  << " to this ip:port " << Socket::getIPAddress() << ":"
118  << Socket::getPort() << __E__;
119  __SS_ONLY_THROW__;
120  }
121 
122  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer << "' got "
123  << receiveBuffer.size() << " (socketNumber=" << socketNumber_
124  << ") check ==> "
125  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
126  start)
127  .count()
128  << " milliseconds. PID=" << getpid()
129  << " TID=" << std::this_thread::get_id() << std::endl;
130 
131  //assume response may be multiple packets! (and give interPacketTimeoutUSeconds unless called with lower timeout)
132  std::string receiveBuffer2;
133  while(receive(receiveBuffer2,
134  0 /*timeoutSeconds*/,
135  (timeoutSeconds == 0 && timeoutUSeconds < interPacketTimeoutUSeconds)
136  ? timeoutUSeconds
137  : interPacketTimeoutUSeconds,
138  verbose) >= 0)
139  {
140  receiveBuffer += receiveBuffer2; //append
141 
142  __COUTT__ << " ----> Time sendAndReceive +" << receiveBuffer2.size()
143  << " check ==> "
144  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
145  start)
146  .count()
147  << " milliseconds." << std::endl;
148  }
149 
150  __COUTT__ << " ----> Time sendAndReceive " << receiveBuffer.size() << " check ==> "
151  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
152  start)
153  .count()
154  << " milliseconds." << std::endl;
155 
156  return receiveBuffer;
157 } //end sendAndReceive()
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
int acknowledge(const std::string &buffer, bool verbose=false, size_t maxChunkSize=1500, unsigned int interPacketGapUSeconds=0)
acknowledge() responds to last receive location
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false, unsigned int interPacketTimeoutUSeconds=10000)
defines used also by OtsConfigurationWizardSupervisor