Line data Source code
1 : #include "TRACE/tracemf.h"
2 : #define TRACE_NAME "GenToBuffer"
3 :
4 : #include "artdaq-core/Utilities/configureMessageFacility.hh"
5 : #include "artdaq/Application/LoadParameterSet.hh"
6 : #include "artdaq/DAQdata/Globals.hh"
7 : #include "artdaq/DAQrate/FragmentBuffer.hh"
8 : #include "artdaq/DAQrate/RequestBuffer.hh"
9 : #include "artdaq/Generators/CommandableFragmentGenerator.hh"
10 : #include "artdaq/Generators/makeCommandableFragmentGenerator.hh"
11 :
12 : #include "fhiclcpp/ParameterSet.h"
13 : #include "fhiclcpp/types/Atom.h"
14 : #include "fhiclcpp/types/Comment.h"
15 : #include "fhiclcpp/types/Name.h"
16 : #include "fhiclcpp/types/Table.h"
17 : #include "fhiclcpp/types/TableFragment.h"
18 :
19 : #include <boost/program_options.hpp>
20 : namespace bpo = boost::program_options;
21 : #include <boost/thread.hpp>
22 :
23 : #include <thread>
24 :
25 : namespace artdaq {
26 : /**
27 : * @brief Test fixture for GenToBuffer_t
28 : */
29 : class GenToBufferTest
30 : {
31 : public:
32 : /**
33 : * @brief GenToBufferTest constructor
34 : * @param ps ParameterSet for GenToBufferTest
35 : */
36 0 : explicit GenToBufferTest(fhicl::ParameterSet const& ps)
37 0 : : generator_ptr_(nullptr)
38 0 : , fragment_buffer_ptr_(new FragmentBuffer(ps))
39 0 : , request_buffer_ptr_(new RequestBuffer(1))
40 0 : , fragment_count_(0)
41 0 : , running_(false)
42 : {
43 0 : generator_ptr_ = makeCommandableFragmentGenerator(ps.get<std::string>("generator"), ps);
44 0 : generator_ptr_->SetRequestBuffer(request_buffer_ptr_);
45 0 : fragment_buffer_ptr_->SetRequestBuffer(request_buffer_ptr_);
46 0 : }
47 :
48 : /**
49 : * @brief Start the test fixture
50 : * @param run_number Run Number to use in Start commands
51 : */
52 0 : void start(int run_number)
53 : {
54 0 : metricMan->do_start();
55 0 : request_buffer_ptr_->setRunning(true);
56 0 : generator_ptr_->StartCmd(run_number, 0, 0);
57 :
58 0 : running_ = true;
59 0 : boost::thread::attributes attrs;
60 0 : attrs.set_stack_size(4096 * 2000); // 8 MB
61 0 : receive_thread_.reset(new boost::thread(attrs, boost::bind(&GenToBufferTest::receive_fragments, this)));
62 0 : send_thread_.reset(new boost::thread(attrs, boost::bind(&GenToBufferTest::send_fragments, this)));
63 0 : }
64 : /**
65 : * @brief Stop the test fixture
66 : */
67 0 : void stop()
68 : {
69 0 : generator_ptr_->StopCmd(0, 0);
70 0 : fragment_buffer_ptr_->Stop();
71 0 : request_buffer_ptr_->setRunning(false);
72 :
73 0 : running_ = false;
74 0 : if (receive_thread_ && receive_thread_->joinable()) receive_thread_->join();
75 0 : if (send_thread_ && send_thread_->joinable()) send_thread_->join();
76 0 : metricMan->do_stop();
77 0 : }
78 :
79 : /**
80 : * @brief Get a handle to the RequestBuffer
81 : * @return RequestBuffer shared_ptr handle
82 : */
83 0 : std::shared_ptr<RequestBuffer> GetRequestBuffer() { return request_buffer_ptr_; }
84 :
85 : private:
86 0 : void receive_fragments()
87 : {
88 0 : TLOG(TLVL_DEBUG) << "Waiting for first fragment.";
89 0 : artdaq::FragmentPtrs frags;
90 :
91 0 : bool active = true;
92 :
93 0 : while (active && running_)
94 : {
95 0 : auto loop_start = std::chrono::steady_clock::now();
96 0 : TLOG(18) << "receive_fragments getNext start";
97 0 : active = generator_ptr_->getNext(frags);
98 0 : TLOG(18) << "receive_fragments getNext done (active=" << active << ")";
99 0 : auto after_getnext = std::chrono::steady_clock::now();
100 : // 08-May-2015, KAB & JCF: if the generator getNext() method returns false
101 : // (which indicates that the data flow has stopped) *and* the reason that
102 : // it has stopped is because there was an exception that wasn't handled by
103 : // the experiment-specific FragmentGenerator class, we move to the
104 : // InRunError state so that external observers (e.g. RunControl or
105 : // DAQInterface) can see that there was a problem.
106 0 : if (!active && generator_ptr_ && generator_ptr_->exception())
107 : {
108 0 : TLOG(TLVL_ERROR) << "Generator has an exception, aborting!";
109 0 : break;
110 : }
111 :
112 0 : if (!active) { break; }
113 :
114 0 : if (frags.size() > 0)
115 : {
116 0 : metricMan->sendMetric("Fragments Generated", frags.size(), "fragments", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Rate | artdaq::MetricMode::Average);
117 :
118 0 : TLOG(18) << "receive_fragments AddFragmentsToBuffer start";
119 0 : fragment_buffer_ptr_->AddFragmentsToBuffer(std::move(frags));
120 0 : TLOG(18) << "receive_fragments AddFragmentsToBuffer done";
121 0 : auto after_addFragsToBuffer = std::chrono::steady_clock::now();
122 0 : metricMan->sendMetric("FragmentBufferAddTime", artdaq::TimeUtils::GetElapsedTime(after_getnext, after_addFragsToBuffer), "s", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Average | artdaq::MetricMode::Minimum | artdaq::MetricMode::Maximum);
123 : }
124 0 : metricMan->sendMetric("GetNextTime", artdaq::TimeUtils::GetElapsedTime(loop_start, after_getnext), "s", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Average | artdaq::MetricMode::Minimum | artdaq::MetricMode::Maximum);
125 :
126 0 : frags.clear();
127 : }
128 0 : TLOG(TLVL_DEBUG) << "receive_fragments loop end";
129 0 : }
130 :
131 0 : void send_fragments()
132 : {
133 0 : TLOG(TLVL_DEBUG) << "Waiting for first fragment.";
134 0 : artdaq::FragmentPtrs frags;
135 :
136 0 : bool active = true;
137 :
138 0 : while (active && running_)
139 : {
140 0 : auto loop_start = std::chrono::steady_clock::now();
141 :
142 0 : TLOG(18) << "send_fragments applyRequests start";
143 0 : active = fragment_buffer_ptr_->applyRequests(frags);
144 0 : TLOG(18) << "send_fragments applyRequests done (active=" << active << ")";
145 :
146 0 : auto after_requests = std::chrono::steady_clock::now();
147 0 : if (!active) { break; }
148 :
149 0 : for (auto& fragPtr : frags)
150 : {
151 0 : if (!fragPtr.get())
152 : {
153 0 : TLOG(TLVL_WARNING) << "Encountered a bad fragment pointer in fragment " << fragment_count_ << ". "
154 0 : << "This is most likely caused by a problem with the Fragment Generator!";
155 0 : continue;
156 0 : }
157 0 : if (fragment_count_ == 0)
158 : {
159 0 : TLOG(TLVL_DEBUG) << "Received first Fragment from Fragment Generator, sequence ID " << fragPtr->sequenceID() << ", size = " << fragPtr->sizeBytes() << " bytes.";
160 : }
161 0 : artdaq::Fragment::sequence_id_t sequence_id = fragPtr->sequenceID();
162 0 : artdaq::Globals::SetMFIteration("Sequence ID " + std::to_string(sequence_id));
163 :
164 0 : TLOG(17) << "send_fragments seq=" << sequence_id << " sendFragment start";
165 0 : ++fragment_count_;
166 :
167 : // Turn on lvls (mem and/or slow) 3,13,14 to log every send.
168 0 : TLOG(((fragment_count_ == 1) ? TLVL_DEBUG
169 : : (((fragment_count_ % 250) == 0) ? 13 : 14)))
170 0 : << ((fragment_count_ == 1)
171 0 : ? "Sent first Fragment"
172 0 : : "Sending fragment " + std::to_string(fragment_count_))
173 0 : << " with SeqID " << sequence_id << ".";
174 : }
175 :
176 0 : metricMan->sendMetric("Fragments Discarded", frags.size(), "fragments", 3, artdaq::MetricMode::Accumulate | artdaq::MetricMode::Rate | artdaq::MetricMode::Average);
177 0 : frags.clear();
178 0 : auto after_frag_check = std::chrono::steady_clock::now();
179 0 : metricMan->sendMetric("ApplyRequestsTime", artdaq::TimeUtils::GetElapsedTime(loop_start, after_requests), "s", 3, artdaq::MetricMode::Average | artdaq::MetricMode::Accumulate | artdaq::MetricMode::Maximum | artdaq::MetricMode::Minimum);
180 0 : metricMan->sendMetric("FragmentDiscardTime", artdaq::TimeUtils::GetElapsedTime(after_requests, after_frag_check), "s", 3, artdaq::MetricMode::Average | artdaq::MetricMode::Accumulate);
181 :
182 0 : std::this_thread::yield();
183 : }
184 :
185 : // 11-May-2015, KAB: call MetricManager::do_stop whenever we exit the
186 : // processing fragments loop so that metrics correctly go to zero when
187 : // there is no data flowing
188 0 : metricMan->do_stop();
189 :
190 0 : TLOG(TLVL_DEBUG) << "send_fragments loop end";
191 0 : }
192 :
193 : private:
194 : std::unique_ptr<CommandableFragmentGenerator> generator_ptr_;
195 : std::unique_ptr<FragmentBuffer> fragment_buffer_ptr_;
196 : std::shared_ptr<RequestBuffer> request_buffer_ptr_;
197 : std::atomic<size_t> fragment_count_;
198 : std::atomic<bool> running_;
199 : std::unique_ptr<boost::thread> receive_thread_;
200 : std::unique_ptr<boost::thread> send_thread_;
201 : };
202 : } // namespace artdaq
203 :
204 0 : int main(int argc, char* argv[])
205 : {
206 0 : artdaq::configureMessageFacility("RequestSender");
207 :
208 : struct FragmentReceiverConfig
209 : {
210 : fhicl::TableFragment<artdaq::CommandableFragmentGenerator::Config> generatorConfig;
211 : fhicl::TableFragment<artdaq::FragmentBuffer::Config> fragmentBufferConfig;
212 : };
213 :
214 : struct DAQConfig
215 : {
216 : fhicl::Table<FragmentReceiverConfig> frConfig{fhicl::Name{"fragment_receiver"}};
217 : };
218 :
219 : struct Config
220 : {
221 : fhicl::Table<DAQConfig> daq{fhicl::Name{"daq"}};
222 : fhicl::Table<artdaq::MetricManager::Config> metrics{fhicl::Name{"metrics"}};
223 : fhicl::Atom<double> test_duration_s{fhicl::Name{"test_duration_s"}, fhicl::Comment{"Duration, in seconds, for the test"}, 60.0};
224 : fhicl::Atom<size_t> time_between_requests_us{fhicl::Name{"time_between_requests_us"}, fhicl::Comment{"Amount of time to wait between generated requests, in us"}, 1000};
225 : fhicl::Atom<artdaq::Fragment::timestamp_t> timestamp_increment{fhicl::Name{"timestamp_increment"}, fhicl::Comment{"Amount to increment the timestamp for each request"}, 1};
226 : fhicl::Atom<int> run_number{fhicl::Name{"run_number"}, fhicl::Comment{"Run Number to use for the test"}, 101};
227 : };
228 :
229 0 : auto pset = LoadParameterSet<Config>(argc, argv, "GenToBuffer", "This test application evaluates the rate of Fragment Generation and Request Application.");
230 0 : auto fr_pset = pset;
231 :
232 0 : metricMan->initialize(pset.get<fhicl::ParameterSet>("metrics", fhicl::ParameterSet()), "GenToBuffer");
233 :
234 0 : if (pset.has_key("daq"))
235 : {
236 0 : fr_pset = pset.get<fhicl::ParameterSet>("daq");
237 : }
238 :
239 0 : if (fr_pset.has_key("fragment_receiver"))
240 : {
241 0 : fr_pset = fr_pset.get<fhicl::ParameterSet>("fragment_receiver");
242 : }
243 0 : artdaq::GenToBufferTest gtbt(fr_pset);
244 :
245 0 : auto buf = gtbt.GetRequestBuffer();
246 :
247 0 : auto start_time = std::chrono::steady_clock::now();
248 0 : auto duration = pset.get<double>("test_duration_s", 60);
249 0 : artdaq::Fragment::sequence_id_t seq = 0;
250 0 : artdaq::Fragment::timestamp_t timestamp = 1;
251 0 : auto time_between_requests_us = pset.get<size_t>("time_between_requests_us", 1000);
252 0 : auto timestamp_scale = pset.get<artdaq::Fragment::timestamp_t>("timestamp_increment", 1);
253 :
254 0 : gtbt.start(pset.get<int>("run_number", 101));
255 :
256 0 : while (artdaq::TimeUtils::GetElapsedTime(start_time) < duration)
257 : {
258 0 : buf->push(++seq, timestamp);
259 0 : timestamp += timestamp_scale;
260 :
261 0 : auto us_since_start = artdaq::TimeUtils::GetElapsedTimeMicroseconds(start_time);
262 0 : int64_t time_diff = seq * time_between_requests_us - us_since_start;
263 0 : TLOG(40) << "Time Diff: " << time_diff << ", Time since start: " << us_since_start << ", current epoch: " << seq * time_between_requests_us;
264 0 : if (time_diff > 10)
265 : {
266 0 : usleep(time_diff);
267 : }
268 : }
269 :
270 0 : gtbt.stop();
271 0 : }
|