Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #include "artdaq/DAQdata/Globals.hh" // Before trace.h gets included in ConcurrentQueue (from GlobalQueue)
3 : #define TRACE_NAME (app_name + "_RequestSender").c_str()
4 : #include "artdaq/DAQrate/detail/RequestSender.hh"
5 :
6 : #include "artdaq/DAQdata/TCPConnect.hh"
7 :
8 : #include "fhiclcpp/ParameterSet.h"
9 :
10 : #include <boost/thread.hpp>
11 :
12 : #include <dlfcn.h>
13 : #include <chrono>
14 : #include <cstring>
15 : #include <fstream>
16 : #include <iomanip>
17 : #include <mutex>
18 : #include <sstream>
19 : #include <thread>
20 : #include <utility>
21 :
22 : namespace artdaq {
23 19 : RequestSender::RequestSender(const fhicl::ParameterSet& pset)
24 19 : : send_requests_(pset.get<bool>("send_requests", false))
25 19 : , initialized_(false)
26 57 : , request_address_(pset.get<std::string>("request_address", "227.128.12.26"))
27 38 : , request_port_(pset.get<int>("request_port", 3001))
28 38 : , request_delay_(pset.get<size_t>("request_delay_ms", 0) * 1000)
29 38 : , request_shutdown_timeout_us_(pset.get<size_t>("request_shutdown_timeout_us", 100000))
30 19 : , request_socket_(-1)
31 95 : , multicast_out_addr_(pset.get<std::string>("multicast_interface_ip", pset.get<std::string>("output_address", "0.0.0.0")))
32 19 : , request_mode_(detail::RequestMessageMode::Normal)
33 38 : , min_request_interval_ms_(pset.get<size_t>("min_request_interval_ms", 100))
34 19 : , request_sending_(0)
35 19 : , requests_sent_(0)
36 57 : , run_number_(0)
37 : {
38 57 : TLOG(TLVL_DEBUG) << "RequestSender CONSTRUCTOR pset=" << pset.to_string();
39 19 : setup_requests_();
40 :
41 38 : TLOG(TLVL_DEBUG + 35) << "artdaq::RequestSender::RequestSender ctor - reader_thread_ initialized";
42 19 : initialized_ = true;
43 19 : }
44 :
45 36 : RequestSender::~RequestSender()
46 : {
47 57 : TLOG(TLVL_INFO) << "Shutting down RequestSender: Waiting for " << request_sending_.load() << " requests to be sent (total sent: " << requests_sent_ << ")";
48 :
49 19 : auto start_time = std::chrono::steady_clock::now();
50 :
51 38 : while (request_sending_.load() > 0 && request_shutdown_timeout_us_ + request_delay_ > TimeUtils::GetElapsedTimeMicroseconds(start_time))
52 : {
53 0 : usleep(1000);
54 : }
55 : {
56 19 : std::lock_guard<std::mutex> lk(request_mutex_);
57 19 : std::lock_guard<std::mutex> lk2(request_send_mutex_);
58 19 : }
59 57 : TLOG(TLVL_INFO) << "Shutting down RequestSender: request_socket_: " << request_socket_;
60 19 : if (request_socket_ != -1)
61 : {
62 1 : if (shutdown(request_socket_, 2) != 0 && errno == ENOTSOCK)
63 : {
64 0 : TLOG(TLVL_ERROR) << "Shutdown of request_socket_ resulted in ENOTSOCK. NOT Closing file descriptor!";
65 : }
66 : else
67 : {
68 1 : close(request_socket_);
69 : }
70 1 : request_socket_ = -1;
71 : }
72 36 : }
73 :
74 3 : void RequestSender::SetRequestMode(detail::RequestMessageMode mode)
75 : {
76 : {
77 3 : std::lock_guard<std::mutex> lk(request_mutex_);
78 3 : request_mode_ = mode;
79 3 : }
80 3 : SendRequest(true);
81 3 : }
82 :
83 19 : void RequestSender::setup_requests_()
84 : {
85 19 : if (send_requests_)
86 : {
87 1 : request_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
88 1 : if (request_socket_ < 0)
89 : {
90 0 : TLOG(TLVL_ERROR) << "I failed to create the socket for sending Data Requests! err=" << strerror(errno);
91 0 : exit(1);
92 : }
93 1 : int sts = ResolveHost(request_address_.c_str(), request_port_, request_addr_);
94 1 : if (sts == -1)
95 : {
96 0 : TLOG(TLVL_ERROR) << "Unable to resolve Data Request address, err=" << strerror(errno);
97 0 : exit(1);
98 : }
99 :
100 : /* if (multicast_out_addr_ == "0.0.0.0")
101 : {
102 : char hostname[HOST_NAME_MAX];
103 : sts = gethostname(hostname, HOST_NAME_MAX);
104 : multicast_out_addr_ = std::string(hostname);
105 : if (sts < 0)
106 : {
107 : TLOG(TLVL_ERROR) << "Could not get current hostname, err=" << strerror(errno);
108 : exit(1);
109 : }
110 : }*/
111 :
112 : // For 0.0.0.0, use system-specified IP_MULTICAST_IF
113 1 : if (multicast_out_addr_ != "localhost" && multicast_out_addr_ != "0.0.0.0")
114 : {
115 : struct in_addr addr;
116 0 : sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), addr);
117 : // sts = ResolveHost(multicast_out_addr_.c_str(), addr);
118 0 : if (sts == -1)
119 : {
120 0 : TLOG(TLVL_ERROR) << "Unable to determine the multicast interface address for " << multicast_out_addr_ << ", err=" << strerror(errno);
121 0 : exit(1);
122 : }
123 : char addr_str[INET_ADDRSTRLEN];
124 0 : inet_ntop(AF_INET, &(addr), addr_str, INET_ADDRSTRLEN);
125 0 : TLOG(TLVL_INFO) << "Successfully determined the multicast network interface for " << multicast_out_addr_ << ": " << addr_str;
126 :
127 0 : if (setsockopt(request_socket_, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == -1)
128 : {
129 0 : TLOG(TLVL_ERROR) << "Cannot set outgoing interface, err=" << strerror(errno);
130 0 : exit(1);
131 : }
132 : }
133 1 : int yes = 1;
134 1 : if (setsockopt(request_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
135 : {
136 0 : TLOG(TLVL_ERROR) << "Unable to enable port reuse on request socket, err=" << strerror(errno);
137 0 : exit(1);
138 : }
139 1 : if (setsockopt(request_socket_, IPPROTO_IP, IP_MULTICAST_LOOP, &yes, sizeof(yes)) < 0)
140 : {
141 0 : TLOG(TLVL_ERROR) << "Unable to enable multicast loopback on request socket, err=" << strerror(errno);
142 0 : exit(1);
143 : }
144 1 : if (setsockopt(request_socket_, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes)) == -1)
145 : {
146 0 : TLOG(TLVL_ERROR) << "Cannot set request socket to broadcast, err=" << strerror(errno);
147 0 : exit(1);
148 : }
149 : }
150 19 : }
151 :
152 4 : void RequestSender::do_send_request_()
153 : {
154 4 : if (!send_requests_)
155 : {
156 0 : request_sending_--;
157 0 : return;
158 : }
159 4 : if (request_socket_ == -1)
160 : {
161 0 : setup_requests_();
162 : }
163 :
164 8 : TLOG(TLVL_DEBUG + 33) << "Waiting for " << request_delay_ << " microseconds.";
165 4 : std::this_thread::sleep_for(std::chrono::microseconds(request_delay_));
166 :
167 8 : TLOG(TLVL_DEBUG + 33) << "Creating RequestMessage";
168 4 : detail::RequestMessage message;
169 4 : message.setRank(my_rank);
170 4 : message.setRunNumber(run_number_);
171 : {
172 4 : std::lock_guard<std::mutex> lk(request_mutex_);
173 10 : for (auto& req : active_requests_)
174 : {
175 12 : TLOG(TLVL_DEBUG + 36) << "Adding a request with sequence ID " << req.first << ", timestamp " << req.second << " to request message";
176 6 : message.addRequest(req.first, req.second);
177 : }
178 8 : TLOG(TLVL_DEBUG + 33) << "Setting mode flag in Message Header to " << static_cast<int>(request_mode_);
179 4 : message.setMode(request_mode_);
180 4 : }
181 : char str[INET_ADDRSTRLEN];
182 4 : inet_ntop(AF_INET, &(request_addr_.sin_addr), str, INET_ADDRSTRLEN);
183 4 : std::lock_guard<std::mutex> lk2(request_send_mutex_);
184 8 : TLOG(TLVL_DEBUG + 33) << "Sending request for " << message.size() << " events to multicast group " << str
185 4 : << ", port " << request_port_ << ", interface " << multicast_out_addr_;
186 4 : auto buf = message.GetMessage();
187 4 : auto sts = sendto(request_socket_, &buf[0], buf.size(), 0, reinterpret_cast<struct sockaddr*>(&request_addr_), sizeof(request_addr_)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
188 4 : if (sts < 0 || static_cast<size_t>(sts) != buf.size())
189 : {
190 0 : TLOG(TLVL_ERROR) << "Error sending request message err=" << strerror(errno) << "sts=" << sts;
191 0 : request_socket_ = -1;
192 0 : request_sending_--;
193 0 : return;
194 : }
195 8 : TLOG(TLVL_DEBUG + 33) << "Done sending request sts=" << sts;
196 4 : request_sending_--;
197 4 : requests_sent_++;
198 4 : }
199 :
200 2540 : void RequestSender::SendRequest(bool endOfRunOnly)
201 : {
202 2540 : while (!initialized_)
203 : {
204 0 : usleep(1000);
205 : }
206 :
207 2540 : if (!send_requests_)
208 : {
209 2536 : return;
210 : }
211 : {
212 4 : std::lock_guard<std::mutex> lk(request_mutex_);
213 4 : if (endOfRunOnly && request_mode_ != detail::RequestMessageMode::EndOfRun)
214 : {
215 0 : return;
216 : }
217 4 : }
218 4 : last_request_send_time_ = std::chrono::steady_clock::now();
219 4 : request_sending_++;
220 8 : boost::thread request([this] { do_send_request_(); });
221 4 : request.detach();
222 4 : }
223 :
224 446 : void RequestSender::AddRequest(Fragment::sequence_id_t seqID, Fragment::timestamp_t timestamp)
225 : {
226 446 : while (!initialized_)
227 : {
228 0 : usleep(1000);
229 : }
230 :
231 : {
232 446 : std::lock_guard<std::mutex> lk(request_mutex_);
233 446 : if (active_requests_.count(seqID) == 0u)
234 : {
235 892 : TLOG(TLVL_DEBUG + 37) << "Adding request for sequence ID " << seqID << " and timestamp " << timestamp << " to request list.";
236 446 : active_requests_[seqID] = timestamp;
237 : }
238 :
239 446 : while (active_requests_.size() > detail::RequestMessage::max_request_count())
240 : {
241 0 : TLOG(TLVL_WARNING) << "Erasing request with seqID " << active_requests_.begin()->first << " due to over-large request list size! (" << active_requests_.size() << " / " << detail::RequestMessage::max_request_count() << ")";
242 0 : active_requests_.erase(active_requests_.begin());
243 : }
244 446 : }
245 446 : SendRequest(TimeUtils::GetElapsedTimeMilliseconds(last_request_send_time_) < min_request_interval_ms_);
246 446 : }
247 :
248 441 : void RequestSender::RemoveRequest(Fragment::sequence_id_t seqID)
249 : {
250 441 : while (!initialized_)
251 : {
252 0 : usleep(1000);
253 : }
254 441 : std::lock_guard<std::mutex> lk(request_mutex_);
255 882 : TLOG(TLVL_DEBUG + 38) << "Removing request for sequence ID " << seqID << " from request list.";
256 441 : active_requests_.erase(seqID);
257 441 : }
258 : } // namespace artdaq
|