otsdaq  3.04.02
TransceiverSocket.cc
1 #include "otsdaq/NetworkUtilities/TransceiverSocket.h"
2 #include "otsdaq/Macros/CoutMacros.h"
3 #include "otsdaq/MessageFacility/MessageFacility.h"
4 
5 #include <iostream>
6 #include <thread> // std::this_thread
7 
8 using namespace ots;
9 
10 //==============================================================================
11 TransceiverSocket::TransceiverSocket(void)
12 {
13  __COUT__ << "TransceiverSocket constructor " << __E__;
14 }
15 
16 //==============================================================================
17 TransceiverSocket::TransceiverSocket(std::string IPAddress, unsigned int port)
18  : Socket(IPAddress, port)
19 {
20  __COUT__ << "TransceiverSocket constructor " << IPAddress << ":" << port << __E__;
21 }
22 
23 //==============================================================================
24 TransceiverSocket::~TransceiverSocket(void) {}
25 
26 //==============================================================================
28 int TransceiverSocket::acknowledge(const std::string& buffer, bool verbose)
29 {
30  // lockout other senders for the remainder of the scope
31  std::lock_guard<std::mutex> lock(sendMutex_);
32 
33  if(verbose)
34  __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_
35  << " from-port: " << ntohs(socketAddress_.sin_port)
36  << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port)
37  << std::endl;
38 
39  constexpr size_t MAX_SEND_SIZE = 1500;
40  size_t offset = 0;
41  int sendToSize = 1;
42 
43  int sizeInBytes = 1;
44 
45  while(offset < buffer.size() && sendToSize > 0)
46  {
47  auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE
48  ? MAX_SEND_SIZE
49  : sizeInBytes * (buffer.size() - offset);
50  if(verbose)
51  __COUTTV__(thisSize);
52  sendToSize = sendto(socketNumber_,
53  &buffer[0] + offset,
54  thisSize,
55  0,
56  (struct sockaddr*)&(ReceiverSocket::fromAddress_),
57  sizeof(sockaddr_in));
58  offset += sendToSize / sizeInBytes;
59  }
60 
61  if(sendToSize <= 0)
62  {
63  __SS__ << "Error writing buffer from port "
64  << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": "
65  << strerror(errno) << std::endl;
66  __SS_THROW__; //return -1;
67  }
68 
69  return 0;
70 } //end acknowledge()
71 
72 //==============================================================================
77  const std::string& sendBuffer,
78  unsigned int timeoutSeconds /* = 1 */,
79  unsigned int timeoutUSeconds /* = 0 */,
80  bool verbose /* = false */)
81 {
82  using clock = std::chrono::steady_clock;
83  auto start = clock::now();
84 
85  // lockout other sender and receive attempts for the remainder of the scope
86  std::lock_guard<std::mutex> lock(
87  sendAndReceiveMutex_); //note that TransmitterSocket::sendMutex_ is not enough
88 
89  flush(); //make sure nothing to read before sending
90  send(toSocket, sendBuffer, verbose);
91 
92  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer
93  << "' (socketNumber=" << socketNumber_ << ") check ==> "
94  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
95  start)
96  .count()
97  << " milliseconds. PID=" << getpid()
98  << " TID=" << std::this_thread::get_id() << std::endl;
99 
100  std::string receiveBuffer;
101  if(receive(receiveBuffer, timeoutSeconds, timeoutUSeconds, verbose) < 0)
102  {
103  __SS__ << "Timeout (" << timeoutSeconds + timeoutUSeconds / 1000000.
104  << " s) or Error receiving response buffer from remote ip:port "
105  << toSocket.getIPAddress() << ":" << toSocket.getPort()
106  << " to this ip:port " << Socket::getIPAddress() << ":"
107  << Socket::getPort() << __E__;
108  __SS_ONLY_THROW__;
109  }
110 
111  __COUTT__ << " ----> Time sendAndReceive '" << sendBuffer << "' got "
112  << receiveBuffer.size() << " (socketNumber=" << socketNumber_
113  << ") check ==> "
114  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
115  start)
116  .count()
117  << " milliseconds. PID=" << getpid()
118  << " TID=" << std::this_thread::get_id() << std::endl;
119 
120  //assume response may be multiple packets! (and give 10 ms unless called with lower timeout)
121  std::string receiveBuffer2;
122  while(receive(receiveBuffer2,
123  0 /*timeoutSeconds*/,
124  (timeoutSeconds == 0 && timeoutUSeconds < 10000)
125  ? timeoutUSeconds
126  : 10000 /*timeoutUSeconds*/,
127  verbose) >= 0)
128  {
129  receiveBuffer += receiveBuffer2; //append
130 
131  __COUTT__ << " ----> Time sendAndReceive +" << receiveBuffer2.size()
132  << " check ==> "
133  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
134  start)
135  .count()
136  << " milliseconds." << std::endl;
137  }
138 
139  __COUTT__ << " ----> Time sendAndReceive " << receiveBuffer.size() << " check ==> "
140  << std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() -
141  start)
142  .count()
143  << " milliseconds." << std::endl;
144 
145  return receiveBuffer;
146 } //end sendAndReceive()
int receive(std::string &buffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
returns count of dropped packets
std::string sendAndReceive(Socket &toSocket, const std::string &sendBuffer, unsigned int timeoutSeconds=1, unsigned int timeoutUSeconds=0, bool verbose=false)
int acknowledge(const std::string &buffer, bool verbose=false)
responds to last receive location
defines used also by OtsConfigurationWizardSupervisor