Line data Source code
1 : #ifndef artdaq_Application_MPI2_RoutingManagerCore_hh
2 : #define artdaq_Application_MPI2_RoutingManagerCore_hh
3 :
4 : #include "artdaq/DAQrate/StatisticsHelper.hh"
5 : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
6 : #include "artdaq/DAQrate/detail/TokenReceiver.hh"
7 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
8 :
9 : #include "canvas/Persistency/Provenance/RunID.h"
10 : #include "fhiclcpp/ParameterSet.h"
11 :
12 : #include <boost/thread.hpp>
13 :
14 : // Socket Includes
15 : #include <netinet/in.h>
16 : #include <sys/epoll.h>
17 :
18 : #include <atomic>
19 : #include <map>
20 : #include <memory>
21 : #include <mutex>
22 : #include <set>
23 : #include <string>
24 :
25 : namespace artdaq {
26 : class RoutingManagerCore;
27 : }
28 :
29 : /**
30 : * \brief RoutingManagerCore implements the state machine for the RoutingManager artdaq application.
31 : * RoutingManagerCore collects tokens from receivers, and at regular intervals uses these tokens to build
32 : * Routing Tables that are sent to the senders.
33 : */
34 : class artdaq::RoutingManagerCore
35 : {
36 : public:
37 : static const std::string TABLE_UPDATES_STAT_KEY; ///< Key for Table Update count MonnitoredQuantity
38 : static const std::string TOKENS_RECEIVED_STAT_KEY; ///< Key for the Tokens Received MonitoredQuantity
39 : static const std::string CURRENT_TABLE_INTERVAL_STAT_KEY; ///< Key for the Current Table Interval MonitoredQuantity
40 :
41 : /**
42 : * \brief RoutingManagerCore Constructor.
43 : */
44 : RoutingManagerCore();
45 :
46 : /**
47 : * \brief Copy Constructor is deleted
48 : */
49 : RoutingManagerCore(RoutingManagerCore const&) = delete;
50 :
51 : /**
52 : * Destructor.
53 : */
54 : ~RoutingManagerCore();
55 :
56 : /**
57 : * \brief Copy Assignment operator is deleted
58 : * \return RoutingManagerCore copy
59 : */
60 : RoutingManagerCore& operator=(RoutingManagerCore const&) = delete;
61 :
62 : RoutingManagerCore(RoutingManagerCore&&) = delete; ///< Move Constructor is deleted
63 : RoutingManagerCore& operator=(RoutingManagerCore&&) = delete; ///< Move Assignment Operator is deleted
64 :
65 : /**
66 : * \brief Processes the initialize request.
67 : * \param pset ParameterSet used to configure the RoutingManagerCore
68 : * \return Whether the initialize attempt succeeded
69 : *
70 : * \verbatim
71 : * RoutingManagerCore accepts the following Parameters:
72 : * "daq" (REQUIRED): FHiCL table containing DAQ configuration
73 : * "policy" (REQUIRED): FHiCL table containing the RoutingManagerPolicy configuration
74 : * "policy" (Default: ""): Name of the RoutingManagerPolicy plugin to load
75 : * "rt_priority" (Default: 0): Unix process priority to assign to RoutingManagerCore
76 : * "table_update_interval_ms" (Default: 1000): Maximum amount of time between table updates
77 : * "table_update_interval_high_frac" (Default: 0.75): Fraction of the maximum seen table size at which the interval should be reduced
78 : * "table_update_interval_low_frac" (Default: 0.5): Fraction of the maximum seen table size at which the interval should be increased
79 : * "senders_send_by_send_count" (Default: false): If true, senders will use the current send count to lookup routing information in the table, instead of sequence ID.
80 : * "table_ack_retry_count" (Default: 5): The number of times the table will be resent while waiting for acknowledements
81 : * "table_update_port" (Default: 35556): The port on which to send table updates
82 : * "table_acknowledge_port" (Default: 35557): The port on which to listen for RoutingAckPacket datagrams
83 : * "table_update_address" (Default: "227.128.12.28"): Multicast address to send table updates to
84 : * "routing_manager_hostname" (Default: "localhost"): Hostname to send table updates from
85 : * "metrics": FHiCL table containing configuration for MetricManager
86 : * \endverbatim
87 : */
88 : bool initialize(fhicl::ParameterSet const& pset, uint64_t, uint64_t);
89 :
90 : /**
91 : * \brief Start the RoutingManagerCore
92 : * \param id Run ID of the current run
93 : * \return True if no exception
94 : */
95 : bool start(art::RunID id, uint64_t, uint64_t);
96 :
97 : /**
98 : * \brief Stops the RoutingManagerCore
99 : * \return True if no exception
100 : */
101 : bool stop(uint64_t, uint64_t);
102 :
103 : /**
104 : * \brief Pauses the RoutingManagerCore
105 : * \return True if no exception
106 : */
107 : bool pause(uint64_t, uint64_t);
108 :
109 : /**
110 : * \brief Resumes the RoutingManagerCore
111 : * \return True if no exception
112 : */
113 : bool resume(uint64_t, uint64_t);
114 :
115 : /**
116 : * \brief Shuts Down the RoutingManagerCore
117 : * \return If the shutdown was successful
118 : */
119 : bool shutdown(uint64_t);
120 :
121 : /**
122 : * \brief Soft-Initializes the RoutingManagerCore.
123 : * \param pset ParameterSet for configuring RoutingManagerCore
124 : * \param timeout Timeout for transition
125 : * \param timestamp Timestamp of transition
126 : * \return Returns initialize status
127 : */
128 : bool soft_initialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp);
129 :
130 : /**
131 : * \brief Reinitializes the RoutingManagerCore.
132 : * \param pset ParameterSet for configuring RoutingManagerCore
133 : * \param timeout Timeout for transition
134 : * \param timestamp Timestamp of transition
135 : * \return Returns initialize status
136 : */
137 : bool reinitialize(fhicl::ParameterSet const& pset, uint64_t timeout, uint64_t timestamp);
138 :
139 : /**
140 : * \brief Main loop of the RoutingManagerCore. Determines when to send the next table update,
141 : * asks the RoutingManagerPolicy for the table to send, and sends it.
142 : */
143 : void process_event_table();
144 :
145 : /**
146 : * \brief Sends a detail::RoutingPacket to the table receivers
147 : * \param packet The detail::RoutingPacket to send
148 : *
149 : * send_event_table checks the table update socket and the acknowledge socket before
150 : * sending the table update the first time. It then enters a loop where it sends the table
151 : * update, then waits for acknowledgement packets. It keeps track of which senders have sent
152 : * their acknowledgement packets, and discards duplicate acks. It leaves this loop once all
153 : * senders have sent a valid acknowledgement packet.
154 : */
155 : void send_event_table(detail::RoutingPacket packet);
156 :
157 : /**
158 : * \brief Send a report on the current status of the RoutingManagerCore
159 : * \return A string containing the report on the current status of the RoutingManagerCore
160 : *
161 : */
162 : std::string report(std::string const&) const;
163 :
164 : /**
165 : * \brief Get the number of table updates sent by this RoutingManager this run
166 : * \return The number of table updates sent by this RoutingManager this run
167 : */
168 0 : size_t get_update_count() const { return table_update_count_; }
169 :
170 : private:
171 : art::RunID run_id_;
172 :
173 : fhicl::ParameterSet policy_pset_;
174 : fhicl::ParameterSet token_receiver_pset_;
175 : int rt_priority_;
176 :
177 : size_t max_table_update_interval_ms_;
178 : std::atomic<size_t> current_table_interval_ms_;
179 : double table_update_high_fraction_;
180 : double table_update_low_fraction_;
181 : std::atomic<size_t> table_update_count_;
182 :
183 : std::shared_ptr<RoutingManagerPolicy> policy_;
184 : std::unique_ptr<TokenReceiver> token_receiver_;
185 :
186 : std::atomic<bool> shutdown_requested_;
187 : std::atomic<bool> stop_requested_;
188 : std::atomic<bool> pause_requested_;
189 :
190 : std::unique_ptr<boost::thread> listen_thread_;
191 : int table_listen_port_;
192 :
193 : std::map<int, std::set<int>> connected_fds_;
194 : int epoll_fd_{-1};
195 : mutable std::mutex fd_mutex_;
196 :
197 : // attributes and methods for statistics gathering & reporting
198 : std::shared_ptr<artdaq::StatisticsHelper> statsHelperPtr_;
199 :
200 : std::string buildStatisticsString_() const;
201 :
202 : void sendMetrics_();
203 :
204 : void listen_();
205 : void receive_();
206 : int find_fd_(int fd) const;
207 : };
208 :
209 : #endif /* artdaq_Application_MPI2_RoutingManagerCore_hh */
|