Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_REQUEST_RECEVIER_HH
2 : #define ARTDAQ_DAQRATE_REQUEST_RECEVIER_HH
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq/DAQrate/RequestBuffer.hh"
8 :
9 : namespace fhicl {
10 : class ParameterSet;
11 : }
12 :
13 : #include "fhiclcpp/types/Atom.h"
14 : #include "fhiclcpp/types/Comment.h"
15 : #include "fhiclcpp/types/ConfigurationTable.h"
16 : #include "fhiclcpp/types/Name.h"
17 :
18 : #include <boost/thread.hpp>
19 : #include <mutex>
20 :
21 : namespace artdaq {
22 : /// <summary>
23 : /// Receive data requests and make them available to CommandableFragmentGenerator or other interested parties. Track received requests and report errors when inconsistency is detected.
24 : /// </summary>
25 : class RequestReceiver
26 : {
27 : public:
28 : /// <summary>
29 : /// Configuration of the RequestReceiver. May be used for parameter validation
30 : /// </summary>
31 : struct Config
32 : {
33 : /// "receive_requests" (Default: false): Whether this RequestReceiver will listen for requests
34 : fhicl::Atom<bool> receive_requests{fhicl::Name{"receive_requests"}, fhicl::Comment{"Whether this RequestReceiver will listen for requests"}, false};
35 : /// "request_port" (Default: 3001) : Port on which data requests will be received
36 : fhicl::Atom<int> request_port{fhicl::Name{"request_port"}, fhicl::Comment{"Port to listen for request messages on"}, 3001};
37 : /// "request_address" (Default: "227.128.12.26") : Address which CommandableFragmentGenerator will listen for requests on
38 : fhicl::Atom<std::string> request_addr{fhicl::Name{"request_address"}, fhicl::Comment{"Multicast address to listen for request messages on"}, "227.128.12.26"};
39 : /// "multicast_interface_ip" (Default: "0.0.0.0") : Use this hostname for multicast(to assign to the proper NIC)
40 : fhicl::Atom<std::string> output_address{fhicl::Name{"multicast_interface_ip"}, fhicl::Comment{"Use this hostname for multicast (to assign to the proper NIC)"}, "0.0.0.0"};
41 : /// "end_of_run_quiet_timeout_ms" (Default: 1000) : Time, in milliseconds, that the entire system must be quiet for check_stop to return true in request mode. **DO NOT EDIT UNLESS YOU KNOW WHAT YOU ARE DOING!**
42 : fhicl::Atom<size_t> end_of_run_timeout_ms{fhicl::Name{"end_of_run_quiet_timeout_ms"}, fhicl::Comment{"Amount of time (in ms) to wait for no new requests when a Stop transition is pending"}, 1000};
43 : };
44 : /// Used for ParameterSet validation (if desired)
45 : using Parameters = fhicl::WrappedTable<Config>;
46 :
47 : /**
48 : * \brief RequestReceiver Default Constructor
49 : */
50 : RequestReceiver();
51 :
52 : /**
53 : * \brief RequestReceiver Constructor
54 : * \param ps ParameterSet used to configure RequestReceiver. See artdaq::RequestReceiver::Config
55 : * \param output_buffer Pointer to RequestBuffer where Requests should be stored
56 : */
57 : RequestReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RequestBuffer> output_buffer);
58 :
59 : /**
60 : * \brief RequestReceiver Destructor
61 : */
62 : virtual ~RequestReceiver();
63 :
64 : /**
65 : * \brief Opens the socket used to listen for data requests
66 : */
67 : void setupRequestListener();
68 :
69 : /**
70 : * \brief Disables (stops) the reception of data requests
71 : * \param force Whether to suppress any error messages (used if called from destructor)
72 : */
73 : void stopRequestReception(bool force = false);
74 :
75 : /**
76 : * \brief Enables (starts) the reception of data requests
77 : */
78 : void startRequestReception();
79 :
80 : /**
81 : * \brief This function receives data request packets, adding new requests to the request list
82 : */
83 : void receiveRequestsLoop();
84 :
85 : /// <summary>
86 : /// Determine if the RequestReceiver is receiving requests
87 : /// </summary>
88 : /// <returns>True if the request receiver is running</returns>
89 : bool isRunning() { return running_; }
90 :
91 : /// <summary>
92 : /// Sets the current run number
93 : /// </summary>
94 : /// <param name="run">The current run number</param>
95 0 : void SetRunNumber(uint32_t run) { run_number_ = run; }
96 :
97 0 : size_t GetReceivedMessageCount() { return requests_received_.load(); }
98 :
99 : private:
100 : RequestReceiver(RequestReceiver const&) = delete;
101 : RequestReceiver(RequestReceiver&&) = delete;
102 : RequestReceiver& operator=(RequestReceiver const&) = delete;
103 : RequestReceiver& operator=(RequestReceiver&&) = delete;
104 :
105 : bool running_{false};
106 : std::atomic<bool> request_stop_requested_;
107 : std::atomic<size_t> requests_received_;
108 : std::atomic<bool> should_stop_;
109 :
110 : int request_port_{3001};
111 : uint32_t run_number_{0};
112 : std::string request_addr_;
113 : std::string multicast_in_addr_;
114 : bool receive_requests_;
115 :
116 : // Socket parameters
117 : int request_socket_{-1};
118 : std::chrono::steady_clock::time_point request_stop_timeout_;
119 : size_t end_of_run_timeout_ms_{1000};
120 : mutable std::mutex state_mutex_;
121 : boost::thread requestThread_;
122 :
123 : std::shared_ptr<RequestBuffer> requests_;
124 : };
125 : } // namespace artdaq
126 :
127 : #endif // ARTDAQ_DAQRATE_REQUEST_RECEVIER_HH
|