Line data Source code
1 : #ifndef artdaq_DAQrate_RequestSender_hh
2 : #define artdaq_DAQrate_RequestSender_hh
3 :
4 : #include "artdaq/DAQrate/detail/RequestMessage.hh"
5 :
6 : #include "fhiclcpp/types/Atom.h"
7 : #include "fhiclcpp/types/Comment.h"
8 : #include "fhiclcpp/types/ConfigurationTable.h"
9 : #include "fhiclcpp/types/Name.h"
10 : #include "fhiclcpp/types/Table.h"
11 :
12 : #include <arpa/inet.h>
13 : #include <netinet/in.h>
14 : #include <sys/socket.h>
15 : #include <sys/types.h>
16 : #include <chrono>
17 : #include <cstdint>
18 : #include <map>
19 : #include <memory>
20 :
21 : namespace artdaq {
22 :
23 : /**
24 : * \brief The RequestSender contains methods used to send data requests and Routing tokens
25 : */
26 : class RequestSender
27 : {
28 : public:
29 : /// <summary>
30 : /// Configuration of the RequestSender. May be used for parameter validation
31 : /// </summary>
32 : struct Config
33 : {
34 : /// "send_requests" (Default: false): Whether to send DataRequests when new sequence IDs are seen
35 : fhicl::Atom<bool> send_requests{fhicl::Name{"send_requests"}, fhicl::Comment{"Enable sending Data Request messages"}, false};
36 : /// "request_port" (Default: 3001): Port to send DataRequests on
37 : fhicl::Atom<int> request_port{fhicl::Name{"request_port"}, fhicl::Comment{"Port to send DataRequests on"}, 3001};
38 : /// "request_delay_ms" (Default: 10): How long to wait before sending new DataRequests
39 : fhicl::Atom<size_t> request_delay_ms{fhicl::Name{"request_delay_ms"}, fhicl::Comment{"How long to wait before sending new DataRequests"}, 10};
40 : /// "request_shutdown_timeout_us" (Default: 100000 us): How long to wait for pending requests to be sent at shutdown
41 : fhicl::Atom<size_t> request_shutdown_timeout_us{fhicl::Name{"request_shutdown_timeout_us"}, fhicl::Comment{"How long to wait for pending requests to be sent at shutdown"}, 100000};
42 : /// "multicast_interface_ip" (Default: "0.0.0.0"): Use this hostname for multicast output (to assign to the proper NIC)
43 : fhicl::Atom<std::string> output_address{fhicl::Name{"multicast_interface_ip"}, fhicl::Comment{"Use this hostname for multicast output(to assign to the proper NIC)"}, "0.0.0.0"};
44 : /// "request_address" (Default: "227.128.12.26"): Multicast address to send DataRequests to
45 : fhicl::Atom<std::string> request_address{fhicl::Name{"request_address"}, fhicl::Comment{"Multicast address to send DataRequests to"}, "227.128.12.26"};
46 : /// "min_request_interval_ms" (Default: 500): Minimum time between automatic sends (ignored in EndOfRun RequetsMode)
47 : fhicl::Atom<size_t> min_request_interval_ms{fhicl::Name{"min_request_interval_ms"}, fhicl::Comment{"Minimum time between automatic sends (ignored in EndOfRun RequetsMode)"}, 100};
48 : };
49 : /// Used for ParameterSet validation (if desired)
50 : using Parameters = fhicl::WrappedTable<Config>;
51 :
52 : /**
53 : * \brief Default Constructor is deleted
54 : */
55 : RequestSender() = delete;
56 :
57 : /**
58 : * \brief Copy Constructor is deleted
59 : */
60 : RequestSender(RequestSender const&) = delete;
61 :
62 : /**
63 : * \brief Copy Assignment operator is deleted
64 : * \return RequestSender copy
65 : */
66 : RequestSender& operator=(RequestSender const&) = delete;
67 :
68 : RequestSender(RequestSender&&) = delete; ///< Move Constructor is deleted
69 : RequestSender& operator=(RequestSender&&) = delete; ///< Move-assignment operator is deleted
70 :
71 : /**
72 : * \brief RequestSender Constructor
73 : * \param pset ParameterSet used to configured RequestSender. See artdaq::RequestSender::Config
74 : */
75 : explicit RequestSender(const fhicl::ParameterSet& pset);
76 : /**
77 : * \brief RequestSender Destructor
78 : */
79 : virtual ~RequestSender();
80 :
81 : /**
82 : * \brief Set the mode for RequestMessages. Used to indicate when RequestSender should enter "EndOfRun" mode
83 : * \param mode Mode to set
84 : */
85 : void SetRequestMode(detail::RequestMessageMode mode);
86 :
87 : /**
88 : * \brief Get the mode for RequestMessages.
89 : * \return Current RequestMessageMode of the RequestSender
90 : */
91 1 : detail::RequestMessageMode GetRequestMode() const { return request_mode_; }
92 :
93 : /**
94 : * \brief Send a request message containing all current requests
95 : * \param endOfRunOnly Whether the request should only be sent in EndOfRun RequestMessageMode (default: false)
96 : */
97 : void SendRequest(bool endOfRunOnly = false);
98 :
99 : /**
100 : * \brief Add a request to the request list
101 : * \param seqID Sequence ID for request
102 : * \param timestamp Timestamp to request
103 : */
104 : void AddRequest(Fragment::sequence_id_t seqID, Fragment::timestamp_t timestamp);
105 :
106 : /**
107 : * \brief Remove a request from the request list
108 : * \param seqID Sequence ID of request
109 : */
110 : void RemoveRequest(Fragment::sequence_id_t seqID);
111 :
112 : /**
113 : * \brief Set the run number to be used in request messages
114 : * \param run Run number
115 : */
116 17 : void SetRunNumber(uint32_t run) { run_number_ = run; }
117 :
118 : /**
119 : * \brief Determine if the RequestSender is currently sending any requests
120 : * \return True if RequestSender has requests to send
121 : *
122 : * This function is used for testing
123 : */
124 : bool RequestsInFlight() { return request_sending_.load() != 0; }
125 :
126 : /**
127 : * @brief Get the number of requests sent by this RequestSender
128 : * @return The number of requests sent
129 : */
130 0 : size_t GetSentMessageCount() { return requests_sent_.load(); }
131 :
132 : private:
133 : private:
134 : // Request stuff
135 : bool send_requests_;
136 : std::atomic<bool> initialized_;
137 : mutable std::mutex request_mutex_;
138 : mutable std::mutex request_send_mutex_;
139 : std::map<Fragment::sequence_id_t, Fragment::timestamp_t> active_requests_;
140 : std::string request_address_;
141 : int request_port_;
142 : size_t request_delay_;
143 : size_t request_shutdown_timeout_us_;
144 : int request_socket_;
145 : struct sockaddr_in request_addr_;
146 : std::string multicast_out_addr_;
147 : detail::RequestMessageMode request_mode_;
148 : std::chrono::steady_clock::time_point last_request_send_time_;
149 : size_t min_request_interval_ms_;
150 :
151 : std::atomic<int> request_sending_;
152 : std::atomic<size_t> requests_sent_;
153 : uint32_t run_number_;
154 :
155 : private:
156 : void setup_requests_();
157 :
158 : void do_send_request_();
159 : };
160 : } // namespace artdaq
161 : #endif /* artdaq_DAQrate_RequestSender_hh */
|