Line data Source code
1 : ////////////////////////////////////////////////////////////////////////
2 : // Class: RequestSender
3 : // Module Type: analyzer
4 : // File: RequestSender_module.cc
5 : // Description: Sends artdaq requests for events
6 : ////////////////////////////////////////////////////////////////////////
7 :
8 : #include "TRACE/tracemf.h"
9 : #define TRACE_NAME "RequestSenderModule"
10 :
11 : #include "art/Framework/Core/EDAnalyzer.h"
12 : #include "art/Framework/Core/ModuleMacros.h"
13 : #include "art/Framework/Principal/Event.h"
14 : #include "art/Framework/Principal/Handle.h"
15 : #include "art/Framework/Principal/Run.h"
16 :
17 : #include "artdaq-core/Data/ContainerFragment.hh"
18 : #include "artdaq-core/Data/Fragment.hh"
19 : #include "artdaq/DAQrate/detail/RequestSender.hh"
20 :
21 : #include <set>
22 :
23 : namespace artdaq {
24 : class RequestSenderModule;
25 : }
26 :
27 : /// <summary>
28 : /// An art::EDAnalyzer module which sends requests for events (e.g. for the Mu2e CRV system)
29 : /// </summary>
30 : class artdaq::RequestSenderModule : public art::EDAnalyzer
31 : {
32 : public:
33 : /**
34 : * \brief RequestSender Constructor
35 : * \param pset ParameterSet used to configure RequestSender
36 : *
37 : * See artdaq/DAQrate/detail/RequestSender.hh for RequestSender configuration
38 : * Additional Parameters:
39 : * "request_list_max_size": Maximum number of requests to send at once
40 : */
41 : explicit RequestSenderModule(fhicl::ParameterSet const& pset);
42 : /**
43 : * \brief Virtual Destructor. Shuts down MetricManager if one is present
44 : */
45 : ~RequestSenderModule() override;
46 :
47 : /**
48 : * \brief Analyze each event, using the configured mode bitmask
49 : * \param evt art::Event to analyze
50 : */
51 : void analyze(art::Event const& evt) override;
52 :
53 : /**
54 : * @brief Perform begin Run actions
55 : * @param run Run object
56 : */
57 : void beginRun(art::Run const& run) override;
58 : /**
59 : * @brief PErform end Run actions
60 : * @param run Run object
61 : */
62 : void endRun(art::Run const& run) override;
63 :
64 : private:
65 : RequestSenderModule(RequestSenderModule const&) = delete;
66 : RequestSenderModule(RequestSenderModule&&) = delete;
67 : RequestSenderModule& operator=(RequestSenderModule const&) = delete;
68 : RequestSenderModule& operator=(RequestSenderModule&&) = delete;
69 :
70 : RequestSender the_sender_;
71 : std::set<artdaq::Fragment::sequence_id_t> active_requests_;
72 : size_t max_active_requests_;
73 :
74 : void clear_active_requests();
75 : void trim_active_requests();
76 : };
77 :
78 0 : artdaq::RequestSenderModule::RequestSenderModule(fhicl::ParameterSet const& pset)
79 : : EDAnalyzer(pset)
80 0 : , the_sender_(pset)
81 0 : , max_active_requests_(pset.get<size_t>("request_list_max_size", 1000))
82 : {
83 0 : }
84 :
85 0 : artdaq::RequestSenderModule::~RequestSenderModule()
86 : {
87 0 : }
88 :
89 0 : void artdaq::RequestSenderModule::analyze(art::Event const& evt)
90 : {
91 : // get all the artdaq fragment collections in the event.
92 0 : std::vector<art::Handle<std::vector<artdaq::Fragment>>> fragmentHandles;
93 0 : fragmentHandles = evt.getMany<std::vector<artdaq::Fragment>>();
94 :
95 0 : artdaq::Fragment::sequence_id_t seq = 0;
96 0 : artdaq::Fragment::timestamp_t timestamp = 0;
97 :
98 0 : for (auto& handle : fragmentHandles)
99 : {
100 0 : if (!handle.isValid()) continue;
101 :
102 0 : auto frag = handle->at(0);
103 0 : seq = frag.sequenceID();
104 0 : timestamp = frag.timestamp();
105 :
106 0 : if (seq != 0 && timestamp != 0) break;
107 0 : }
108 :
109 0 : if (seq != 0 && timestamp != 0)
110 : {
111 0 : TLOG(TLVL_DEBUG + 35) << "Adding request for sequence ID " << seq << ", timestamp " << timestamp;
112 0 : the_sender_.AddRequest(seq, timestamp);
113 0 : active_requests_.insert(seq);
114 0 : trim_active_requests();
115 : }
116 0 : }
117 :
118 0 : void artdaq::RequestSenderModule::beginRun(art::Run const& run)
119 : {
120 0 : the_sender_.SetRunNumber(run.run());
121 0 : the_sender_.SetRequestMode(detail::RequestMessageMode::Normal);
122 0 : clear_active_requests();
123 0 : }
124 :
125 0 : void artdaq::RequestSenderModule::endRun(art::Run const&)
126 : {
127 0 : the_sender_.SetRequestMode(detail::RequestMessageMode::EndOfRun);
128 0 : the_sender_.SendRequest();
129 0 : clear_active_requests();
130 0 : }
131 :
132 0 : void artdaq::RequestSenderModule::clear_active_requests()
133 : {
134 0 : for (auto& req : active_requests_)
135 : {
136 0 : the_sender_.RemoveRequest(req);
137 : }
138 0 : }
139 :
140 0 : void artdaq::RequestSenderModule::trim_active_requests()
141 : {
142 0 : TLOG(TLVL_DEBUG + 33) << "Going to remove extra active requests";
143 0 : while (active_requests_.size() > max_active_requests_)
144 : {
145 0 : TLOG(TLVL_DEBUG + 36) << "Removing request with sequence ID " << *active_requests_.begin();
146 0 : the_sender_.RemoveRequest(*active_requests_.begin());
147 0 : active_requests_.erase(active_requests_.begin());
148 : }
149 0 : }
150 :
151 0 : DEFINE_ART_MODULE(artdaq::RequestSenderModule) // NOLINT(performance-unnecessary-value-param)
|