Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_BoardReaderApp").c_str() // NOLINT
3 :
4 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
5 : #include "artdaq/Application/BoardReaderApp.hh"
6 :
7 : #include <boost/lexical_cast.hpp>
8 :
9 : #include <memory>
10 : #include <string>
11 :
12 0 : artdaq::BoardReaderApp::BoardReaderApp()
13 0 : : fragment_receiver_ptr_(nullptr)
14 : {
15 0 : }
16 :
17 : // *******************************************************************
18 : // *** The following methods implement the state machine operations.
19 : // *******************************************************************
20 :
21 0 : bool artdaq::BoardReaderApp::do_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
22 : {
23 0 : report_string_ = "";
24 0 : external_request_status_ = true;
25 :
26 : // in the following block, we first destroy the existing BoardReader
27 : // instance, then create a new one. Doing it in one step does not
28 : // produce the desired result since that creates a new instance and
29 : // then deletes the old one, and we need the opposite order.
30 0 : TLOG(TLVL_DEBUG + 32) << "Initializing first deleting old instance " << static_cast<void*>(fragment_receiver_ptr_.get());
31 0 : fragment_receiver_ptr_.reset(nullptr);
32 0 : fragment_receiver_ptr_ = std::make_unique<BoardReaderCore>(*this);
33 0 : TLOG(TLVL_DEBUG + 32) << "Initializing new BoardReaderCore at " << static_cast<void*>(fragment_receiver_ptr_.get()) << " with pset " << pset.to_string();
34 0 : external_request_status_ = fragment_receiver_ptr_->initialize(pset, timeout, timestamp);
35 0 : if (!external_request_status_)
36 : {
37 0 : report_string_ = "Error initializing ";
38 0 : report_string_.append(app_name + " ");
39 0 : report_string_.append("with ParameterSet = \"" + pset.to_string() + "\".");
40 : }
41 :
42 0 : TLOG(TLVL_DEBUG + 32) << "do_initialize(fhicl::ParameterSet, uint64_t, uint64_t): "
43 0 : << "Done initializing.";
44 0 : return external_request_status_;
45 : }
46 :
47 0 : bool artdaq::BoardReaderApp::do_start(art::RunID id, uint64_t timeout, uint64_t timestamp)
48 : {
49 0 : report_string_ = "";
50 0 : if (timeout == 0)
51 : {
52 0 : timeout = 3600; // seconds
53 : }
54 0 : fragment_receiver_ptr_->SetStartTransitionTimeout(timeout);
55 0 : external_request_status_ = true;
56 :
57 0 : boost::thread::attributes attrs;
58 0 : attrs.set_stack_size(4096 * 2000); // 8 MB
59 : try
60 : {
61 0 : fragment_output_thread_ = boost::thread(attrs, boost::bind(&BoardReaderCore::send_fragments, fragment_receiver_ptr_.get()));
62 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
63 0 : snprintf(tname, sizeof(tname) - 1, "%d-FragOutput", my_rank); // NOLINT
64 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
65 0 : auto handle = fragment_output_thread_.native_handle();
66 0 : pthread_setname_np(handle, tname);
67 0 : fragment_input_thread_ = boost::thread(attrs, boost::bind(&BoardReaderCore::receive_fragments, fragment_receiver_ptr_.get()));
68 0 : snprintf(tname, sizeof(tname) - 1, "%d-FragInput", my_rank); // NOLINT
69 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
70 0 : handle = fragment_input_thread_.native_handle();
71 0 : pthread_setname_np(handle, tname);
72 : }
73 0 : catch (const boost::exception& e)
74 : {
75 0 : std::stringstream exception_string;
76 0 : exception_string << "Caught boost::exception starting Fragment Processing threads: " << boost::diagnostic_information(e) << ", errno=" << errno;
77 :
78 0 : ExceptionHandler(ExceptionHandlerRethrow::yes, exception_string.str());
79 0 : }
80 :
81 0 : auto start_wait = std::chrono::steady_clock::now();
82 0 : while (!fragment_receiver_ptr_->GetSenderThreadActive() || !fragment_receiver_ptr_->GetReceiverThreadActive())
83 : {
84 0 : if (TimeUtils::GetElapsedTime(start_wait) > timeout)
85 : {
86 0 : TLOG(TLVL_ERROR) << "Timeout occurred waiting for BoardReaderCore threads to start. Timeout = " << timeout << " s, Time waited = " << TimeUtils::GetElapsedTime(start_wait) << " s,"
87 0 : << " Receiver ready: " << std::boolalpha << fragment_receiver_ptr_->GetReceiverThreadActive() << ", Sender ready: " << fragment_receiver_ptr_->GetSenderThreadActive();
88 0 : external_request_status_ = false;
89 0 : break;
90 : }
91 0 : usleep(10000);
92 : }
93 :
94 : // Only if starting threads successful
95 0 : if (external_request_status_)
96 : {
97 0 : external_request_status_ = fragment_receiver_ptr_->start(id, timeout, timestamp);
98 : }
99 :
100 0 : if (!external_request_status_)
101 : {
102 0 : report_string_ = "Error starting ";
103 0 : report_string_.append(app_name + " ");
104 0 : report_string_.append("for run number ");
105 0 : report_string_.append(boost::lexical_cast<std::string>(id.run()));
106 0 : report_string_.append(", timeout ");
107 0 : report_string_.append(boost::lexical_cast<std::string>(timeout));
108 0 : report_string_.append(", timestamp ");
109 0 : report_string_.append(boost::lexical_cast<std::string>(timestamp));
110 0 : report_string_.append(".");
111 : }
112 :
113 0 : return external_request_status_;
114 0 : }
115 :
116 0 : bool artdaq::BoardReaderApp::do_stop(uint64_t timeout, uint64_t timestamp)
117 : {
118 0 : report_string_ = "";
119 0 : external_request_status_ = fragment_receiver_ptr_->stop(timeout, timestamp);
120 0 : if (!external_request_status_)
121 : {
122 0 : report_string_ = "Error stopping ";
123 0 : report_string_.append(app_name + ".");
124 0 : return false;
125 : }
126 0 : if (fragment_input_thread_.joinable())
127 : {
128 0 : TLOG(TLVL_DEBUG + 32) << "Joining fragment input (Generator) thread";
129 0 : fragment_input_thread_.join();
130 : }
131 0 : if (fragment_output_thread_.joinable())
132 : {
133 0 : TLOG(TLVL_DEBUG + 32) << "Joining fragment output (Sender) thread";
134 0 : fragment_output_thread_.join();
135 : }
136 :
137 0 : TLOG(TLVL_DEBUG + 32) << "BoardReader Stopped. Getting run statistics";
138 0 : int number_of_fragments_sent = -1;
139 0 : if (fragment_receiver_ptr_)
140 : {
141 0 : number_of_fragments_sent = fragment_receiver_ptr_->GetFragmentsProcessed();
142 : }
143 0 : TLOG(TLVL_DEBUG + 32) << "do_stop(uint64_t, uint64_t): "
144 0 : << "Number of fragments sent = " << number_of_fragments_sent
145 0 : << ".";
146 :
147 0 : return external_request_status_;
148 : }
149 :
150 0 : bool artdaq::BoardReaderApp::do_pause(uint64_t timeout, uint64_t timestamp)
151 : {
152 0 : report_string_ = "";
153 0 : external_request_status_ = fragment_receiver_ptr_->pause(timeout, timestamp);
154 0 : if (!external_request_status_)
155 : {
156 0 : report_string_ = "Error pausing ";
157 0 : report_string_.append(app_name + ".");
158 : }
159 :
160 0 : if (fragment_input_thread_.joinable()) fragment_input_thread_.join();
161 0 : if (fragment_output_thread_.joinable()) fragment_output_thread_.join();
162 0 : int number_of_fragments_sent = fragment_receiver_ptr_->GetFragmentsProcessed();
163 0 : TLOG(TLVL_DEBUG + 32) << "do_pause(uint64_t, uint64_t): "
164 0 : << "Number of fragments sent = " << number_of_fragments_sent
165 0 : << ".";
166 :
167 0 : return external_request_status_;
168 : }
169 :
170 0 : bool artdaq::BoardReaderApp::do_resume(uint64_t timeout, uint64_t timestamp)
171 : {
172 0 : report_string_ = "";
173 0 : if (timeout == 0)
174 : {
175 0 : timeout = 3600; // seconds
176 : }
177 0 : external_request_status_ = true;
178 :
179 0 : boost::thread::attributes attrs;
180 0 : attrs.set_stack_size(4096 * 2000); // 8 MB
181 : try
182 : {
183 0 : fragment_output_thread_ = boost::thread(attrs, boost::bind(&BoardReaderCore::send_fragments, fragment_receiver_ptr_.get()));
184 : char tname[16]; // Size 16 - see man page pthread_setname_np(3) and/or prctl(2)
185 0 : snprintf(tname, sizeof(tname) - 1, "%d-FragOutput", my_rank); // NOLINT
186 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
187 0 : auto handle = fragment_output_thread_.native_handle();
188 0 : pthread_setname_np(handle, tname);
189 0 : fragment_input_thread_ = boost::thread(attrs, boost::bind(&BoardReaderCore::receive_fragments, fragment_receiver_ptr_.get()));
190 :
191 0 : snprintf(tname, sizeof(tname) - 1, "%d-FragInput", my_rank); // NOLINT
192 0 : tname[sizeof(tname) - 1] = '\0'; // assure term. snprintf is not too evil :)
193 0 : handle = fragment_input_thread_.native_handle();
194 0 : pthread_setname_np(handle, tname);
195 : }
196 0 : catch (const boost::exception& e)
197 : {
198 0 : std::stringstream exception_string;
199 0 : exception_string << "Caught boost::exception starting Fragment Processing threads: " << boost::diagnostic_information(e) << ", errno=" << errno;
200 :
201 0 : ExceptionHandler(ExceptionHandlerRethrow::yes, exception_string.str());
202 0 : }
203 :
204 0 : auto start_wait = std::chrono::steady_clock::now();
205 0 : while (!fragment_receiver_ptr_->GetSenderThreadActive() || !fragment_receiver_ptr_->GetReceiverThreadActive())
206 : {
207 0 : if (TimeUtils::GetElapsedTimeMicroseconds(start_wait) > timeout * 1000000)
208 : {
209 0 : TLOG(TLVL_ERROR) << "Timeout occurred waiting for BoardReaderCore threads to start. Timeout = " << timeout << " s, Time waited = " << TimeUtils::GetElapsedTime(start_wait) << " s,"
210 0 : << " Receiver ready: " << std::boolalpha << fragment_receiver_ptr_->GetReceiverThreadActive() << ", Sender ready: " << fragment_receiver_ptr_->GetSenderThreadActive();
211 0 : external_request_status_ = false;
212 0 : break;
213 : }
214 0 : usleep(10000);
215 : }
216 :
217 : // Only if starting threads successful
218 0 : if (external_request_status_)
219 : {
220 0 : external_request_status_ = fragment_receiver_ptr_->resume(timeout, timestamp);
221 : }
222 0 : if (!external_request_status_)
223 : {
224 0 : report_string_ = "Error resuming ";
225 0 : report_string_.append(app_name + ".");
226 : }
227 0 : return external_request_status_;
228 0 : }
229 :
230 0 : bool artdaq::BoardReaderApp::do_shutdown(uint64_t timeout)
231 : {
232 0 : report_string_ = "";
233 0 : external_request_status_ = fragment_receiver_ptr_->shutdown(timeout);
234 : // 02-Jun-2018, ELF & KAB: it's very, very unlikely that the following call is needed,
235 : // but just in case...
236 0 : if (fragment_input_thread_.joinable()) fragment_input_thread_.join();
237 0 : if (fragment_output_thread_.joinable()) fragment_output_thread_.join();
238 0 : if (!external_request_status_)
239 : {
240 0 : report_string_ = "Error shutting down ";
241 0 : report_string_.append(app_name + ".");
242 : }
243 0 : return external_request_status_;
244 : }
245 :
246 0 : bool artdaq::BoardReaderApp::do_soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
247 : {
248 0 : report_string_ = "";
249 0 : external_request_status_ = fragment_receiver_ptr_->soft_initialize(pset, timeout, timestamp);
250 0 : if (!external_request_status_)
251 : {
252 0 : report_string_ = "Error soft-initializing ";
253 0 : report_string_.append(app_name + " ");
254 0 : report_string_.append("with ParameterSet = \"" + pset.to_string() + "\".");
255 : }
256 0 : return external_request_status_;
257 : }
258 :
259 0 : bool artdaq::BoardReaderApp::do_reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp)
260 : {
261 0 : external_request_status_ = fragment_receiver_ptr_->reinitialize(pset, timeout, timestamp);
262 0 : if (!external_request_status_)
263 : {
264 0 : report_string_ = "Error reinitializing ";
265 0 : report_string_.append(app_name + " ");
266 0 : report_string_.append("with ParameterSet = \"" + pset.to_string() + "\".");
267 : }
268 0 : return external_request_status_;
269 : }
270 :
271 0 : void artdaq::BoardReaderApp::BootedEnter()
272 : {
273 0 : TLOG(TLVL_DEBUG + 32) << "Booted state entry action called.";
274 :
275 : // the destruction of any existing BoardReaderCore has to happen in the
276 : // Booted Entry action rather than the Initialized Exit action because the
277 : // Initialized Exit action is only called after the "init" transition guard
278 : // condition is executed.
279 0 : fragment_receiver_ptr_.reset(nullptr);
280 0 : }
281 :
282 0 : bool artdaq::BoardReaderApp::do_meta_command(std::string const& command, std::string const& arg)
283 : {
284 0 : external_request_status_ = fragment_receiver_ptr_->metaCommand(command, arg);
285 0 : if (!external_request_status_)
286 : {
287 0 : report_string_ = "Error running meta-command on ";
288 0 : report_string_.append(app_name + " ");
289 0 : report_string_.append("with command = \"" + command + "\", arg = \"" + arg + "\".");
290 : }
291 0 : return external_request_status_;
292 : }
293 :
294 0 : std::string artdaq::BoardReaderApp::report(std::string const& which) const
295 : {
296 0 : std::string resultString;
297 :
298 : // if all that is requested is the latest state change result, return it
299 0 : if (which == "transition_status")
300 : {
301 0 : if (report_string_.length() > 0) { return report_string_; }
302 :
303 0 : return "Success";
304 : }
305 :
306 : //// if there is an outstanding report/message at the Commandable/Application
307 : //// level, prepend that
308 : // if (report_string_.length() > 0) {
309 : // resultString.append("*** Overall status message:\r\n");
310 : // resultString.append(report_string_ + "\r\n");
311 : // resultString.append("*** Requested report response:\r\n");
312 : // }
313 :
314 : // pass the request to the BoardReaderCore instance, if it's available
315 0 : if (fragment_receiver_ptr_ != nullptr)
316 : {
317 0 : resultString.append(fragment_receiver_ptr_->report(which));
318 : }
319 : else
320 : {
321 0 : resultString.append("This BoardReader has not yet been initialized and ");
322 0 : resultString.append("therefore can not provide reporting.");
323 : }
324 :
325 0 : return resultString;
326 0 : }
|