Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #include "artdaq/DAQdata/Globals.hh" // include these 2 first -
3 : #define TRACE_NAME (app_name + "_DataReceiverCore").c_str()
4 :
5 : #include "artdaq/Application/DataReceiverCore.hh"
6 :
7 : #include "artdaq-core/Utilities/ExceptionHandler.hh"
8 :
9 : #include <memory>
10 : #include <string>
11 :
12 0 : artdaq::DataReceiverCore::DataReceiverCore()
13 0 : : stop_requested_(false)
14 0 : , pause_requested_(false)
15 0 : , run_is_paused_(false)
16 :
17 : {
18 0 : TLOG(TLVL_DEBUG + 32) << "Constructor";
19 0 : }
20 :
21 0 : artdaq::DataReceiverCore::~DataReceiverCore()
22 : {
23 0 : TLOG(TLVL_DEBUG + 32) << "Destructor";
24 0 : }
25 :
26 0 : bool artdaq::DataReceiverCore::initializeDataReceiver(fhicl::ParameterSet const& pset, fhicl::ParameterSet const& data_pset, fhicl::ParameterSet const& metric_pset)
27 : {
28 : // other parameters
29 0 : verbose_ = data_pset.get<bool>("verbose", true);
30 :
31 : // TRACE here so that mftrace_module and mftrace_iteration are ready by mftrace...should set it for all subsequent traces
32 0 : TLOG(TLVL_INFO) << "Initializing Data Receiver";
33 :
34 0 : if (metric_pset.is_empty())
35 : {
36 0 : TLOG(TLVL_INFO) << "No metric plugins appear to be defined";
37 : }
38 : try
39 : {
40 0 : metricMan->initialize(metric_pset, app_name);
41 : }
42 0 : catch (...)
43 : {
44 0 : ExceptionHandler(ExceptionHandlerRethrow::no,
45 : "Error loading metrics in DataReceiverCore::initialize()");
46 0 : }
47 :
48 0 : fhicl::ParameterSet art_pset = pset;
49 0 : if (art_pset.has_key("art"))
50 : {
51 0 : art_pset = art_pset.get<fhicl::ParameterSet>("art");
52 : }
53 : else
54 : {
55 0 : art_pset.erase("daq");
56 : }
57 :
58 : // Add the "metrics" block
59 0 : auto art_services_pset = art_pset.get<fhicl::ParameterSet>("services");
60 0 : auto art_services_ArtdaqSharedMemoryServiceInterface_pset = art_services_pset.get<fhicl::ParameterSet>("ArtdaqSharedMemoryServiceInterface");
61 0 : art_services_ArtdaqSharedMemoryServiceInterface_pset.put<fhicl::ParameterSet>("metrics", metric_pset);
62 0 : art_services_pset.erase("ArtdaqSharedMemoryServiceInterface");
63 0 : art_services_pset.put<fhicl::ParameterSet>("ArtdaqSharedMemoryServiceInterface", art_services_ArtdaqSharedMemoryServiceInterface_pset);
64 0 : art_pset.erase("services");
65 0 : art_pset.put<fhicl::ParameterSet>("services", art_services_pset);
66 :
67 0 : fhicl::ParameterSet data_tmp = data_pset;
68 0 : if (data_pset.has_key("expected_events_per_bunch"))
69 : {
70 0 : data_tmp.put<int>("expected_fragments_per_event", data_pset.get<int>("expected_events_per_bunch"));
71 : }
72 :
73 0 : if (data_pset.has_key("rank"))
74 : {
75 0 : if (my_rank >= 0 && data_pset.get<int>("rank") != my_rank)
76 : {
77 0 : TLOG(TLVL_WARNING) << "Rank specified at startup is different than rank specified at configure! Using rank received at configure!";
78 : }
79 0 : my_rank = data_pset.get<int>("rank");
80 : }
81 0 : if (my_rank == -1)
82 : {
83 0 : TLOG(TLVL_ERROR) << "Rank not specified at startup or in configuration! Aborting";
84 0 : exit(1);
85 : }
86 :
87 0 : event_store_ptr_ = std::make_shared<SharedMemoryEventManager>(data_tmp, art_pset);
88 0 : art_pset_ = art_pset;
89 0 : TLOG(TLVL_DEBUG + 32) << "Resulting art_pset_: \"" << art_pset_.to_string() << "\".";
90 :
91 0 : receiver_ptr_ = std::make_unique<artdaq::DataReceiverManager>(data_tmp, event_store_ptr_);
92 :
93 0 : return true;
94 0 : }
95 :
96 0 : bool artdaq::DataReceiverCore::start(art::RunID id)
97 : {
98 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting run " << id.run();
99 :
100 : // 13-Jul-2018, KAB: added code to update the art_pset inside the event store
101 : // with configuration archive information
102 : // so that the config info will be stored in the output art/ROOT file.
103 : // (Note that we don't bother looping over the config_archive_entries if that
104 : // map is empty, but we *do* still update the art configuration with art_pset_
105 : // at each begin-run because the config archive may be non-empty one time through
106 : // and then empty the next time.)
107 0 : fhicl::ParameterSet temp_pset = art_pset_;
108 0 : if (!config_archive_entries_.empty())
109 : {
110 0 : fhicl::ParameterSet config_pset;
111 0 : for (auto& entry : config_archive_entries_)
112 : {
113 0 : config_pset.put(entry.first, entry.second);
114 : }
115 0 : temp_pset.put_or_replace("configuration_documents", config_pset);
116 0 : }
117 0 : event_store_ptr_->UpdateArtConfiguration(temp_pset);
118 :
119 0 : stop_requested_.store(false);
120 0 : pause_requested_.store(false);
121 0 : run_is_paused_.store(false);
122 0 : metricMan->do_start();
123 0 : event_store_ptr_->startRun(id.run());
124 0 : receiver_ptr_->start_threads();
125 :
126 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Start transition for run " << event_store_ptr_->runID();
127 0 : return true;
128 0 : }
129 :
130 0 : bool artdaq::DataReceiverCore::stop()
131 : {
132 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Stopping run " << event_store_ptr_->runID();
133 0 : bool endSucceeded = false;
134 : int attemptsToEnd;
135 0 : receiver_ptr_->stop_threads();
136 :
137 : // 21-Jun-2013, KAB - the stop_requested_ variable must be set
138 : // before the flush lock so that the processFragments loop will
139 : // exit (after the timeout), the lock will be released (in the
140 : // processFragments method), and this method can continue.
141 0 : stop_requested_.store(true);
142 :
143 0 : TLOG(TLVL_DEBUG + 32) << "Ending run " << event_store_ptr_->runID();
144 0 : attemptsToEnd = 1;
145 0 : endSucceeded = event_store_ptr_->endRun();
146 0 : while (!endSucceeded && attemptsToEnd < 3)
147 : {
148 0 : ++attemptsToEnd;
149 0 : TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endRun()";
150 0 : endSucceeded = event_store_ptr_->endRun();
151 : }
152 0 : if (!endSucceeded)
153 : {
154 0 : TLOG(TLVL_ERROR)
155 0 : << "EventStore::endRun in stop method failed after three tries.";
156 : }
157 0 : TLOG(TLVL_DEBUG + 32) << "Done Ending run " << event_store_ptr_->runID();
158 :
159 0 : attemptsToEnd = 1;
160 0 : TLOG(TLVL_DEBUG + 32) << "stop: Calling EventStore::endOfData";
161 0 : endSucceeded = event_store_ptr_->endOfData();
162 0 : while (!endSucceeded && attemptsToEnd < 3)
163 : {
164 0 : ++attemptsToEnd;
165 0 : TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endOfData()";
166 0 : endSucceeded = event_store_ptr_->endOfData();
167 : }
168 :
169 0 : run_is_paused_.store(false);
170 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Stop transition for run " << event_store_ptr_->runID();
171 0 : return true;
172 : }
173 :
174 0 : bool artdaq::DataReceiverCore::pause()
175 : {
176 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Pausing run " << event_store_ptr_->runID();
177 0 : pause_requested_.store(true);
178 0 : run_is_paused_.store(true);
179 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Pause transition for run " << event_store_ptr_->runID();
180 0 : return true;
181 : }
182 :
183 0 : bool artdaq::DataReceiverCore::resume()
184 : {
185 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Resuming run " << event_store_ptr_->runID();
186 0 : pause_requested_.store(false);
187 0 : metricMan->do_start();
188 0 : event_store_ptr_->rolloverSubrun(true);
189 0 : run_is_paused_.store(false);
190 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed the Resume transition for run " << event_store_ptr_->runID();
191 0 : return true;
192 : }
193 :
194 0 : bool artdaq::DataReceiverCore::shutdown()
195 : {
196 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Starting Shutdown transition";
197 :
198 : /* We don't care about flushing data here. The only way to transition to the
199 : shutdown state is from a state where there is no data taking. All we have
200 : to do is signal the art input module that we're done taking data so that
201 : it can wrap up whatever it needs to do. */
202 :
203 0 : TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down DataReceiverManager";
204 0 : receiver_ptr_.reset(nullptr);
205 :
206 0 : bool endSucceeded = false;
207 0 : int attemptsToEnd = 1;
208 0 : TLOG(TLVL_DEBUG + 32) << "shutdown: Calling EventStore::endOfData";
209 0 : endSucceeded = event_store_ptr_->endOfData();
210 0 : while (!endSucceeded && attemptsToEnd < 3)
211 : {
212 0 : ++attemptsToEnd;
213 0 : TLOG(TLVL_DEBUG + 32) << "Retrying EventStore::endOfData()";
214 0 : endSucceeded = event_store_ptr_->endOfData();
215 : }
216 :
217 0 : TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down SharedMemoryEventManager";
218 0 : event_store_ptr_.reset();
219 :
220 0 : TLOG(TLVL_DEBUG + 32) << "shutdown: Shutting down MetricManager";
221 0 : metricMan->shutdown();
222 :
223 0 : TLOG(TLVL_DEBUG + 32) << "shutdown: Complete";
224 0 : TLOG((verbose_ ? TLVL_INFO : TLVL_DEBUG + 32)) << "Completed Shutdown transition";
225 0 : return endSucceeded;
226 : }
227 :
228 0 : bool artdaq::DataReceiverCore::soft_initialize(fhicl::ParameterSet const& pset)
229 : {
230 0 : TLOG(TLVL_DEBUG + 32) << "soft_initialize method called with DAQ "
231 0 : << "ParameterSet = \"" << pset.to_string()
232 0 : << "\".";
233 0 : return true;
234 : }
235 :
236 0 : bool artdaq::DataReceiverCore::reinitialize(fhicl::ParameterSet const& pset)
237 : {
238 0 : TLOG(TLVL_DEBUG + 32) << "reinitialize method called with DAQ "
239 0 : << "ParameterSet = \"" << pset.to_string()
240 0 : << "\".";
241 0 : event_store_ptr_ = nullptr;
242 0 : return initialize(pset);
243 : }
244 :
245 0 : bool artdaq::DataReceiverCore::rollover_subrun(uint64_t boundary, uint32_t subrun)
246 : {
247 0 : if (event_store_ptr_)
248 : {
249 0 : event_store_ptr_->rolloverSubrun(boundary, subrun, true);
250 0 : return true;
251 : }
252 0 : return false;
253 : }
254 :
255 0 : std::string artdaq::DataReceiverCore::report(std::string const& which) const
256 : {
257 0 : if (which == "open_event_count")
258 : {
259 0 : if (event_store_ptr_ != nullptr)
260 : {
261 0 : return std::to_string(event_store_ptr_->GetOpenEventCount());
262 : }
263 :
264 0 : return "-1";
265 : }
266 0 : if (which == "event_count")
267 : {
268 0 : if (receiver_ptr_ != nullptr)
269 : {
270 0 : return std::to_string(receiver_ptr_->count());
271 : }
272 :
273 0 : return "0";
274 : }
275 0 : else if (which == "stats")
276 : {
277 0 : if (event_store_ptr_ != nullptr)
278 0 : return event_store_ptr_->BuildStatisticsString();
279 : else
280 0 : return "-1";
281 : }
282 :
283 : // lots of cool stuff that we can do here
284 : // - report on the number of fragments received and the number
285 : // of events built (in the current or previous run
286 : // - report on the number of incomplete events in the EventStore
287 : // (if running)
288 0 : std::string tmpString;
289 0 : if (event_store_ptr_ != nullptr)
290 : {
291 0 : tmpString.append(app_name + " run number = " + std::to_string(event_store_ptr_->runID()) + ".\n");
292 : }
293 0 : tmpString.append("Command \"" + which + "\" is not currently supported.");
294 0 : return tmpString;
295 0 : }
|