Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_RequestReceiver").c_str()
3 :
4 : #include "artdaq/DAQdata/Globals.hh"
5 : #include "artdaq/DAQrate/detail/RequestMessage.hh"
6 : #include "artdaq/DAQrate/detail/RequestReceiver.hh"
7 :
8 : #include <boost/exception/all.hpp>
9 : #include <boost/throw_exception.hpp>
10 :
11 : #include <iterator>
12 : #include <limits>
13 :
14 : #include "canvas/Utilities/Exception.h"
15 : #include "cetlib_except/exception.h"
16 : #include "fhiclcpp/ParameterSet.h"
17 :
18 : #include "artdaq-core/Data/ContainerFragmentLoader.hh"
19 : #include "artdaq-core/Data/Fragment.hh"
20 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
21 : #include "artdaq-core/Utilities/SimpleLookupPolicy.hh"
22 : #include "artdaq-core/Utilities/TimeUtils.hh"
23 :
24 : #include <arpa/inet.h>
25 : #include <netinet/in.h>
26 : #include <sys/poll.h>
27 : #include <algorithm>
28 : #include <fstream>
29 : #include <iomanip>
30 : #include <iostream>
31 : #include <iterator>
32 : #include "artdaq/DAQdata/TCPConnect.hh"
33 :
34 0 : artdaq::RequestReceiver::RequestReceiver()
35 0 : : request_stop_requested_(false)
36 0 : , requests_received_(0)
37 0 : , should_stop_(false)
38 0 : , request_addr_("227.128.12.26")
39 0 : , receive_requests_(false)
40 0 : {}
41 :
42 0 : artdaq::RequestReceiver::RequestReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RequestBuffer> output_buffer)
43 0 : : request_stop_requested_(false)
44 0 : , requests_received_(0)
45 0 : , should_stop_(false)
46 0 : , request_port_(ps.get<int>("request_port", 3001))
47 0 : , request_addr_(ps.get<std::string>("request_address", "227.128.12.26"))
48 0 : , multicast_in_addr_(ps.get<std::string>("multicast_interface_ip", "0.0.0.0"))
49 0 : , receive_requests_(ps.get<bool>("receive_requests", false))
50 0 : , end_of_run_timeout_ms_(ps.get<size_t>("end_of_run_quiet_timeout_ms", 1000))
51 0 : , requests_(output_buffer)
52 : {
53 0 : TLOG(TLVL_DEBUG + 32) << "RequestReceiver CONSTRUCTOR ps: " << ps.to_string();
54 0 : if (receive_requests_)
55 : {
56 0 : setupRequestListener();
57 : }
58 0 : }
59 :
60 0 : void artdaq::RequestReceiver::setupRequestListener()
61 : {
62 0 : TLOG(TLVL_INFO) << "Setting up request listen socket, rank=" << my_rank << ", address=" << request_addr_ << ":" << request_port_
63 0 : << ", multicast interface=" << multicast_in_addr_;
64 0 : request_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
65 0 : if (request_socket_ < 0)
66 : {
67 0 : TLOG(TLVL_ERROR) << "Error creating socket for receiving data requests! err=" << strerror(errno);
68 0 : exit(1);
69 : }
70 :
71 : struct sockaddr_in si_me_request;
72 :
73 0 : int yes = 1;
74 0 : if (setsockopt(request_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
75 : {
76 0 : TLOG(TLVL_ERROR) << "Unable to enable port reuse on request socket, err=" << strerror(errno);
77 0 : exit(1);
78 : }
79 0 : memset(&si_me_request, 0, sizeof(si_me_request));
80 0 : si_me_request.sin_family = AF_INET;
81 0 : si_me_request.sin_port = htons(request_port_);
82 0 : si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
83 0 : if (bind(request_socket_, reinterpret_cast<struct sockaddr*>(&si_me_request), sizeof(si_me_request)) == -1) // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
84 : {
85 0 : TLOG(TLVL_ERROR) << "Cannot bind request socket to port " << request_port_ << ", err=" << strerror(errno);
86 0 : exit(1);
87 : }
88 :
89 0 : if (request_addr_ != "localhost")
90 : {
91 : struct ip_mreq mreq;
92 0 : int sts = ResolveHost(request_addr_.c_str(), mreq.imr_multiaddr);
93 0 : if (sts == -1)
94 : {
95 0 : TLOG(TLVL_ERROR) << "Unable to resolve multicast request address, err=" << strerror(errno);
96 0 : exit(1);
97 : }
98 0 : sts = GetInterfaceForNetwork(multicast_in_addr_.c_str(), mreq.imr_interface);
99 0 : if (sts == -1)
100 : {
101 0 : TLOG(TLVL_ERROR) << "Unable to determine the multicast network interface for " << multicast_in_addr_;
102 0 : exit(1);
103 : }
104 : char addr_str[INET_ADDRSTRLEN];
105 0 : inet_ntop(AF_INET, &(mreq.imr_interface), addr_str, INET_ADDRSTRLEN);
106 0 : TLOG(TLVL_INFO) << "Successfully determined the multicast network interface for " << multicast_in_addr_ << ": " << addr_str << " (RequestReceiver)";
107 0 : if (setsockopt(request_socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
108 : {
109 0 : TLOG(TLVL_ERROR) << "Unable to join multicast group, err=" << strerror(errno);
110 0 : exit(1);
111 : }
112 : }
113 0 : TLOG(TLVL_INFO) << "Done setting up request socket, rank=" << my_rank;
114 0 : }
115 :
116 0 : artdaq::RequestReceiver::~RequestReceiver()
117 : {
118 0 : stopRequestReception(true);
119 0 : }
120 :
121 0 : void artdaq::RequestReceiver::stopRequestReception(bool force)
122 : {
123 0 : std::unique_lock<std::mutex> lk(state_mutex_);
124 0 : if (!receive_requests_) return;
125 0 : if (requests_received_ == 0 && !force)
126 : {
127 0 : TLOG(TLVL_ERROR) << "Stop request received by RequestReceiver, but no requests have ever been received." << std::endl
128 0 : << "Check that UDP port " << request_port_ << " is open in the firewall config.";
129 : }
130 0 : should_stop_ = true;
131 0 : if (running_)
132 : {
133 0 : TLOG(TLVL_DEBUG + 32) << "Joining requestThread";
134 : try
135 : {
136 0 : if (requestThread_.joinable())
137 : {
138 0 : requestThread_.join();
139 : }
140 : }
141 0 : catch (...)
142 : {
143 : // IGNORED
144 0 : }
145 0 : bool once = true;
146 0 : while (running_)
147 : {
148 0 : if (once)
149 : {
150 0 : TLOG(TLVL_ERROR) << "running_ is true after thread join! Should NOT happen";
151 : }
152 0 : once = false;
153 0 : usleep(10000);
154 : }
155 : }
156 :
157 0 : if (request_socket_ != -1)
158 : {
159 0 : close(request_socket_);
160 0 : request_socket_ = -1;
161 : }
162 0 : TLOG(TLVL_INFO) << "RequestReceiver stopped, received " << requests_received_ << " request messages";
163 0 : requests_received_ = 0;
164 0 : }
165 :
166 0 : void artdaq::RequestReceiver::startRequestReception()
167 : {
168 0 : if (!receive_requests_) return;
169 0 : std::unique_lock<std::mutex> lk(state_mutex_);
170 0 : if (requestThread_.joinable())
171 : {
172 0 : requestThread_.join();
173 : }
174 0 : should_stop_ = false;
175 0 : request_stop_requested_ = false;
176 :
177 0 : if (request_socket_ == -1)
178 : {
179 0 : TLOG(TLVL_INFO) << "Connecting Request Reception socket";
180 0 : setupRequestListener();
181 : }
182 :
183 0 : TLOG(TLVL_INFO) << "Starting Request Reception Thread";
184 : try
185 : {
186 0 : requestThread_ = boost::thread(&RequestReceiver::receiveRequestsLoop, this);
187 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
188 0 : snprintf(tname, sizeof(tname) - 1, "%d-ReqRecv", my_rank); // NOLINT
189 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
190 0 : auto handle = requestThread_.native_handle();
191 0 : pthread_setname_np(handle, tname);
192 : }
193 0 : catch (const boost::exception& e)
194 : {
195 0 : TLOG(TLVL_ERROR) << "Caught boost::exception starting Request Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
196 0 : std::cerr << "Caught boost::exception starting Request Receiver thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
197 0 : exit(5);
198 0 : }
199 0 : }
200 :
201 0 : void artdaq::RequestReceiver::receiveRequestsLoop()
202 : {
203 0 : running_ = true;
204 0 : requests_->reset();
205 0 : requests_->setRunning(true);
206 0 : while (!should_stop_)
207 : {
208 0 : TLOG(TLVL_DEBUG + 35) << "receiveRequestsLoop: Polling Request socket for new requests";
209 :
210 0 : if (request_socket_ == -1)
211 : {
212 0 : setupRequestListener();
213 : }
214 :
215 0 : int ms_to_wait = 10;
216 : struct pollfd ufds[1];
217 0 : ufds[0].fd = request_socket_;
218 0 : ufds[0].events = POLLIN | POLLPRI | POLLERR;
219 0 : int rv = poll(ufds, 1, ms_to_wait);
220 :
221 : // Continue loop if no message received or message does not have correct event ID
222 0 : if (rv <= 0 || (ufds[0].revents != POLLIN && ufds[0].revents != POLLPRI))
223 : {
224 0 : if (rv == 1 && ((ufds[0].revents & (POLLNVAL | POLLERR | POLLHUP)) != 0))
225 : {
226 0 : close(request_socket_);
227 0 : request_socket_ = -1;
228 : }
229 0 : if (request_stop_requested_ && TimeUtils::GetElapsedTimeMilliseconds(request_stop_timeout_) > end_of_run_timeout_ms_)
230 : {
231 0 : break;
232 : }
233 0 : continue;
234 : }
235 :
236 0 : TLOG(TLVL_DEBUG + 34) << "Received packet on Request channel";
237 0 : std::vector<uint8_t> buffer(MAX_REQUEST_MESSAGE_SIZE);
238 : struct sockaddr_in from;
239 0 : socklen_t len = sizeof(from);
240 0 : auto sts = recvfrom(request_socket_, &buffer[0], MAX_REQUEST_MESSAGE_SIZE, 0, reinterpret_cast<struct sockaddr*>(&from), &len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
241 0 : if (sts < 0)
242 : {
243 0 : TLOG(TLVL_ERROR) << "Error receiving request message header err=" << strerror(errno);
244 0 : close(request_socket_);
245 0 : request_socket_ = -1;
246 0 : continue;
247 0 : }
248 :
249 0 : auto hdr_buffer = reinterpret_cast<artdaq::detail::RequestHeader*>(&buffer[0]); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
250 0 : TLOG(TLVL_DEBUG + 34) << "Request header word: 0x" << std::hex << hdr_buffer->header << std::dec << ", packet_count: " << hdr_buffer->packet_count << " from rank " << hdr_buffer->rank << ", " << inet_ntoa(from.sin_addr) << ":" << from.sin_port << ", run number: " << hdr_buffer->run_number;
251 0 : if (!hdr_buffer->isValid())
252 : {
253 0 : continue;
254 : }
255 :
256 : // 19-Dec-2018, KAB: added check on current run number
257 0 : if (run_number_ != 0 && hdr_buffer->run_number != run_number_)
258 : {
259 0 : TLOG(TLVL_WARNING) << "Received a Request Message with the wrong run number ("
260 0 : << hdr_buffer->run_number << "), expected " << run_number_
261 0 : << ", ignoring this request.";
262 0 : continue;
263 0 : }
264 :
265 0 : requests_received_++;
266 :
267 0 : if (hdr_buffer->mode == artdaq::detail::RequestMessageMode::EndOfRun)
268 : {
269 0 : TLOG(TLVL_INFO) << "Received Request Message with the EndOfRun marker. (Re)Starting 1-second timeout for receiving all outstanding requests...";
270 0 : request_stop_timeout_ = std::chrono::steady_clock::now();
271 0 : request_stop_requested_ = true;
272 : }
273 :
274 0 : std::vector<artdaq::detail::RequestPacket> pkt_buffer(hdr_buffer->packet_count);
275 0 : memcpy(&pkt_buffer[0], &buffer[sizeof(artdaq::detail::RequestHeader)], sizeof(artdaq::detail::RequestPacket) * hdr_buffer->packet_count);
276 :
277 0 : if (should_stop_)
278 : {
279 0 : break;
280 : }
281 :
282 0 : for (auto& buffer : pkt_buffer)
283 : {
284 0 : TLOG(TLVL_DEBUG + 36) << "Request Packet: hdr=" << /*std::dec <<*/ buffer.header << ", seq=" << buffer.sequence_id << ", ts=" << buffer.timestamp;
285 0 : if (!buffer.isValid()) continue;
286 0 : requests_->push(buffer.sequence_id, buffer.timestamp);
287 : }
288 0 : }
289 0 : TLOG(TLVL_DEBUG + 32) << "Ending Request Thread";
290 0 : running_ = false;
291 0 : requests_->setRunning(false);
292 0 : }
|