otsdaq  3.06.00
TransceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <iostream>
6 #include <thread> // std::this_thread
7 
8 using namespace ots;
9 
10 //==============================================================================
11 TransceiverSocket::TransceiverSocket(void)
12 {
13  __COUT__ << "TransceiverSocket constructor " << __E__;
14 }
15 
16 //==============================================================================
17 TransceiverSocket::TransceiverSocket(std::string IPAddress, unsigned int port)
18  : Socket(IPAddress, port)
19 {
20  __COUT__ << "TransceiverSocket constructor " << IPAddress << ":" << port << __E__;
21 }
22 
23 //==============================================================================
24 TransceiverSocket::~TransceiverSocket(void) {}
25 
26 //==============================================================================
28 int TransceiverSocket::acknowledge(const std::string& buffer,
29  bool verbose /* = false */,
30  size_t maxChunkSize /* = 1500 */)
31 {
32  // lockout other senders for the remainder of the scope
33  std::lock_guard<std::mutex> lock(sendMutex_);
34 
35  if(verbose)
36  __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_
37  << " from-port: " << ntohs(socketAddress_.sin_port)
38  << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
39  << std::endl;
40 
41  const size_t MAX_SEND_SIZE =
42  maxChunkSize > 65500u ? static_cast<size_t>(65500u) : maxChunkSize;
43  size_t offset = 0;
44  int sendToSize = 1;
45 
46  int sizeInBytes = 1;
47 
48  while(offset < buffer.size() && sendToSize > 0)
49  {
50  auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
51  ? MAX_SEND_SIZE
52  : sizeInBytes * (buffer.size() - offset);
53  if(verbose)
54  __COUTTV__(thisSize);
55  sendToSize = sendto(socketNumber_,
56  &buffer[0] + offset,
57  thisSize,
58  0,
59  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
60  sizeof(sockaddr_in));
61  offset += sendToSize / sizeInBytes;
62  }
63 
64  if(sendToSize <= 0)
65  {
66  __SS__ << "Error writing buffer from port "
67  << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": "
68  << strerror(errno) << std::endl;
69  __SS_THROW__; //return -1;
70  }
71 
72  return 0;
73 } //end acknowledge()
74 
75 //==============================================================================
80  Socket& toSocket,
81  const std::string& sendBuffer,
82  unsigned int timeoutSeconds /* = 1 */,
83  unsigned int timeoutUSeconds /* = 0 */,
84  bool verbose /* = false */,
85  unsigned int interPacketTimeoutUSeconds /* = 10000 */)
86 {
87  using clock = std::chrono::steady_clock;
88  auto start = clock::now();
89 
90  // lockout other sender and receive attempts for the remainder of the scope
91  std::lock_guard<std::mutex> lock(
92  sendAndReceiveMutex_); //note that TransmitterSocket::sendMutex_ is not enough
93 
94  flush(); //make sure nothing to read before sending
95  send(toSocket, sendBuffer, verbose);
96 
97  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer
98  << "' (socketNumber=" << socketNumber_ << ") check ==> "
99  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
100  start)
101  .count()
102  << " milliseconds. PID=" << getpid()
103  << " TID=" << std::this_thread::get_id() << std::endl;
104 
105  std::string receiveBuffer;
106  if(receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
107  {
108  __SS__ << "Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
109  << " s) or Error receiving response buffer from remote ip:port "
110  << toSocket.getIPAddress() << ":" << toSocket.getPort()
111  << " to this ip:port " << Socket::getIPAddress() << ":"
112  << Socket::getPort() << __E__;
113  __SS_ONLY_THROW__;
114  }
115 
116  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer << "' got "
117  << receiveBuffer.size() << " (socketNumber=" << socketNumber_
118  << ") check ==> "
119  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
120  start)
121  .count()
122  << " milliseconds. PID=" << getpid()
123  << " TID=" << std::this_thread::get_id() << std::endl;
124 
125  //assume response may be multiple packets! (and give interPacketTimeoutUSeconds unless called with lower timeout)
126  std::string receiveBuffer2;
127  while(receive(receiveBuffer2,
128  0 /*timeoutSeconds*/,
129  (timeoutSeconds == 0 && timeoutUSeconds < interPacketTimeoutUSeconds)
130  ? timeoutUSeconds
131  : interPacketTimeoutUSeconds,
132  verbose) >= 0)
133  {
134  receiveBuffer += receiveBuffer2; //append
135 
136  __COUTT__ << " ----> Time sendAndReceive +" << receiveBuffer2.size()
137  << " check ==> "
138  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
139  start)
140  .count()
141  << " milliseconds." << std::endl;
142  }
143 
144  __COUTT__ << " ----> Time sendAndReceive " << receiveBuffer.size() << " check ==> "
145  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
146  start)
147  .count()
148  << " milliseconds." << std::endl;
149 
150  return receiveBuffer;
151 } //end sendAndReceive()
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
int acknowledge(const std::string &buffer, bool verbose=false, size_t maxChunkSize=1500)
responds to last receive location
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false, unsigned int interPacketTimeoutUSeconds=10000)
defines used also by OtsConfigurationWizardSupervisor