Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_TokenReceiver").c_str()
3 :
4 : #include <arpa/inet.h>
5 :
6 : #include <utility>
7 :
8 : #include <utility>
9 : #include "artdaq/DAQdata/TCP_listen_fd.hh"
10 : #include "artdaq/DAQrate/detail/TokenReceiver.hh"
11 :
12 0 : artdaq::TokenReceiver::TokenReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RoutingManagerPolicy> policy,
13 0 : size_t update_interval_msec)
14 0 : : token_port_(ps.get<int>("routing_token_port", 35555))
15 0 : , policy_(std::move(std::move(policy)))
16 0 : , update_interval_msec_(update_interval_msec)
17 0 : , token_socket_(-1)
18 0 : , token_epoll_fd_(-1)
19 0 : , thread_is_running_(false)
20 0 : , reception_is_paused_(false)
21 0 : , shutdown_requested_(false)
22 0 : , run_number_(0)
23 0 : , statsHelperPtr_(nullptr)
24 : {
25 0 : receive_token_events_ = std::vector<epoll_event>(policy_->GetReceiverCount() + 1);
26 0 : }
27 :
28 0 : artdaq::TokenReceiver::~TokenReceiver()
29 : {
30 0 : stopTokenReception(true);
31 0 : }
32 :
33 0 : void artdaq::TokenReceiver::startTokenReception()
34 : {
35 0 : if (token_thread_.joinable())
36 : {
37 0 : token_thread_.join();
38 : }
39 0 : boost::thread::attributes attrs;
40 0 : attrs.set_stack_size(4096 * 2000); // 8000 KB
41 :
42 0 : reception_is_paused_ = false;
43 0 : shutdown_requested_ = false;
44 :
45 0 : TLOG(TLVL_INFO) << "Starting Token Reception Thread";
46 : try
47 : {
48 0 : token_thread_ = boost::thread(attrs, boost::bind(&TokenReceiver::receiveTokensLoop_, this));
49 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
50 0 : snprintf(tname, sizeof(tname) - 1, "%d-TokenRecv", my_rank); // NOLINT
51 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
52 0 : auto handle = token_thread_.native_handle();
53 0 : pthread_setname_np(handle, tname);
54 : }
55 0 : catch (boost::exception const& e)
56 : {
57 0 : TLOG(TLVL_ERROR) << "Exception encountered starting Token Reception thread: " << boost::diagnostic_information(e) << ", errno=" << errno;
58 0 : std::cerr << "Exception encountered starting Token Reception thread: " << boost::diagnostic_information(e) << ", errno=" << errno << std::endl;
59 0 : exit(3);
60 0 : }
61 0 : received_token_count_ = 0;
62 0 : thread_is_running_ = true;
63 0 : TLOG(TLVL_INFO) << "Started Token Reception Thread";
64 0 : }
65 :
66 0 : void artdaq::TokenReceiver::stopTokenReception(bool force)
67 : {
68 0 : shutdown_requested_ = true;
69 0 : reception_is_paused_ = false;
70 0 : if (thread_is_running_)
71 : {
72 0 : if (received_token_count_ == 0 && !force)
73 : {
74 0 : TLOG(TLVL_DEBUG + 32) << "Stop request received by TokenReceiver, but no tokens have ever been received.";
75 : }
76 0 : TLOG(TLVL_DEBUG + 32) << "Joining tokenThread";
77 : try
78 : {
79 0 : if (token_thread_.joinable())
80 : {
81 0 : token_thread_.join();
82 : }
83 : }
84 0 : catch (...)
85 : {
86 : // IGNORED
87 0 : }
88 0 : thread_is_running_ = false;
89 : }
90 :
91 0 : if (token_socket_ != -1)
92 : {
93 0 : close(token_socket_);
94 0 : token_socket_ = -1;
95 0 : token_epoll_fd_ = -1;
96 : }
97 0 : }
98 :
99 0 : void artdaq::TokenReceiver::receiveTokensLoop_()
100 : {
101 0 : while (!shutdown_requested_)
102 : {
103 0 : TLOG(TLVL_DEBUG + 33) << "Receive Token loop start";
104 0 : if (token_socket_ == -1)
105 : {
106 0 : TLOG(TLVL_DEBUG + 32) << "Opening token listener socket";
107 0 : token_socket_ = TCP_listen_fd(token_port_, 3 * sizeof(detail::RoutingToken));
108 :
109 0 : if (token_epoll_fd_ != -1)
110 : {
111 0 : close(token_epoll_fd_);
112 : }
113 : struct epoll_event ev;
114 0 : token_epoll_fd_ = epoll_create1(0);
115 0 : ev.events = EPOLLIN;
116 0 : ev.data.fd = token_socket_;
117 0 : if (epoll_ctl(token_epoll_fd_, EPOLL_CTL_ADD, token_socket_, &ev) == -1)
118 : {
119 0 : TLOG(TLVL_ERROR) << "Could not register listen socket to epoll fd";
120 0 : exit(3);
121 : }
122 : }
123 0 : if (token_socket_ == -1 || token_epoll_fd_ == -1)
124 : {
125 0 : TLOG(TLVL_DEBUG + 32) << "One of the listen sockets was not opened successfully.";
126 0 : return;
127 : }
128 :
129 0 : auto nfds = epoll_wait(token_epoll_fd_, &receive_token_events_[0], receive_token_events_.size(), update_interval_msec_);
130 0 : if (nfds == -1)
131 : {
132 0 : TLOG(TLVL_ERROR) << "Error status received from epoll_wait, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
133 0 : perror("epoll_wait");
134 0 : exit(EXIT_FAILURE);
135 : }
136 :
137 0 : while (reception_is_paused_ && !shutdown_requested_)
138 : {
139 0 : usleep(10000);
140 : }
141 :
142 0 : TLOG(TLVL_DEBUG + 35) << "Received " << nfds << " events on token sockets";
143 0 : for (auto n = 0; n < nfds; ++n)
144 : {
145 0 : if (receive_token_events_[n].data.fd == token_socket_)
146 : {
147 0 : TLOG(TLVL_DEBUG + 32) << "Accepting new connection on token_socket";
148 : sockaddr_in addr;
149 0 : socklen_t arglen = sizeof(addr);
150 0 : auto conn_sock = accept(token_socket_, reinterpret_cast<struct sockaddr*>(&addr), &arglen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
151 0 : fcntl(conn_sock, F_SETFL, O_NONBLOCK); // set O_NONBLOCK
152 :
153 0 : if (conn_sock == -1)
154 : {
155 0 : TLOG(TLVL_ERROR) << "Error status received from accept, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
156 0 : perror("accept");
157 0 : exit(EXIT_FAILURE);
158 : }
159 :
160 0 : receive_token_addrs_[conn_sock] = std::string(inet_ntoa(addr.sin_addr));
161 0 : TLOG(TLVL_DEBUG + 32) << "New fd is " << conn_sock << " for data-receiver at " << receive_token_addrs_[conn_sock];
162 : struct epoll_event ev;
163 0 : ev.events = EPOLLIN;
164 0 : ev.data.fd = conn_sock;
165 0 : if (epoll_ctl(token_epoll_fd_, EPOLL_CTL_ADD, conn_sock, &ev) == -1)
166 : {
167 0 : TLOG(TLVL_ERROR) << "Error status received from epoll_ctl, exiting with code " << EXIT_FAILURE << ", errno=" << errno << " (" << strerror(errno) << ")";
168 0 : perror("epoll_ctl: conn_sock");
169 0 : exit(EXIT_FAILURE);
170 : }
171 : }
172 0 : else if ((receive_token_events_[n].events & EPOLLIN) != 0)
173 : {
174 0 : auto startTime = artdaq::MonitoredQuantity::getCurrentTime();
175 :
176 : detail::RoutingToken buff;
177 0 : int sts = recv(receive_token_events_[n].data.fd, &buff, sizeof(detail::RoutingToken), MSG_WAITALL);
178 0 : if (sts == 0)
179 : {
180 0 : TLOG(TLVL_WARNING) << "Received 0-size token from " << receive_token_addrs_[receive_token_events_[n].data.fd] << ", closing socket";
181 0 : receive_token_addrs_.erase(receive_token_events_[n].data.fd);
182 0 : close(receive_token_events_[n].data.fd);
183 0 : epoll_ctl(token_epoll_fd_, EPOLL_CTL_DEL, receive_token_events_[n].data.fd, nullptr);
184 : }
185 0 : else if (sts < 0 && errno == EAGAIN)
186 : {
187 0 : TLOG(TLVL_DEBUG + 32) << "No more tokens from this rank. Continuing poll loop.";
188 0 : }
189 0 : else if (sts < 0)
190 : {
191 0 : TLOG(TLVL_ERROR) << "Error reading from token socket: sts=" << sts << ", errno=" << errno;
192 0 : receive_token_addrs_.erase(receive_token_events_[n].data.fd);
193 0 : close(receive_token_events_[n].data.fd);
194 0 : epoll_ctl(token_epoll_fd_, EPOLL_CTL_DEL, receive_token_events_[n].data.fd, nullptr);
195 : }
196 0 : else if (sts == sizeof(detail::RoutingToken) && buff.header != TOKEN_MAGIC)
197 : {
198 0 : TLOG(TLVL_ERROR) << "Received invalid token from " << receive_token_addrs_[receive_token_events_[n].data.fd] << " sts=" << sts;
199 0 : }
200 0 : else if (sts == sizeof(detail::RoutingToken))
201 : {
202 0 : TLOG(TLVL_DEBUG + 32) << "Received token from " << buff.rank << " indicating " << buff.new_slots_free << " slots are free. (run=" << buff.run_number << ")";
203 0 : if (buff.run_number != run_number_)
204 : {
205 0 : TLOG(TLVL_DEBUG + 32) << "Received token from a different run number! Current = " << run_number_ << ", token = " << buff.run_number << ", ignoring (n=" << buff.new_slots_free << ")";
206 : }
207 : else
208 : {
209 0 : received_token_count_ += buff.new_slots_free;
210 0 : policy_->AddReceiverToken(buff.rank, buff.new_slots_free);
211 : }
212 : }
213 0 : auto delta_time = artdaq::MonitoredQuantity::getCurrentTime() - startTime;
214 0 : if (statsHelperPtr_ != nullptr) { statsHelperPtr_->addSample(tokens_received_stat_key_, delta_time); }
215 : }
216 : else
217 : {
218 0 : TLOG(TLVL_DEBUG + 32) << "Received event mask " << receive_token_events_[n].events << " from token fd " << receive_token_events_[n].data.fd;
219 : }
220 : }
221 : }
222 : }
|