Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_TOKEN_RECEIVER_HH
2 : #define ARTDAQ_DAQRATE_TOKEN_RECEIVER_HH
3 :
4 : #include "artdaq/DAQrate/StatisticsHelper.hh"
5 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
6 :
7 : #include "fhiclcpp/types/Atom.h"
8 : #include "fhiclcpp/types/Comment.h"
9 : #include "fhiclcpp/types/ConfigurationTable.h"
10 : #include "fhiclcpp/types/Name.h"
11 :
12 : #include <boost/thread.hpp>
13 :
14 : #include <sys/epoll.h>
15 : #include <atomic>
16 : #include <map>
17 : #include <string>
18 : #include <unordered_map>
19 : #include <vector>
20 :
21 : namespace artdaq {
22 : /**
23 : * \brief Receives event builder "free buffer" tokens and adds them to a specified RoutingPolicy.
24 : */
25 : class TokenReceiver
26 : {
27 : public:
28 : /**
29 : * \brief Configuration of the TokenReceiver. May be used for parameter validation.
30 : */
31 : struct Config
32 : {
33 : /// "routing_token_port" (Default: 35555) : Port on which routing tokens will be received
34 : fhicl::Atom<int> routing_token_port{fhicl::Name{"routing_token_port"}, fhicl::Comment{"Port to listen for routing tokens on"}, 355555};
35 : };
36 : /// Used for ParameterSet validation (if desired)
37 : using Parameters = fhicl::WrappedTable<Config>;
38 :
39 : /**
40 : * \brief TokenReceiver Constructor
41 : * \param ps ParameterSet used to configure TokenReceiver. See artdaq::TokenReceiver::Config
42 : * \param policy RoutingManagerPolicy that manages the received tokens
43 : * \param update_interval_msec The amount of time to wait in epoll_wait for a new update to arrive
44 : */
45 : explicit TokenReceiver(const fhicl::ParameterSet& ps, std::shared_ptr<RoutingManagerPolicy> policy,
46 : size_t update_interval_msec);
47 :
48 : /**
49 : * \brief TokenReceiver Destructor
50 : */
51 : virtual ~TokenReceiver();
52 :
53 : /**
54 : * \brief Starts the reception of event builder tokens
55 : */
56 : void startTokenReception();
57 :
58 : /**
59 : * \brief Temporarily suspends the reception of event builder tokens
60 : */
61 0 : void pauseTokenReception() { reception_is_paused_ = true; }
62 :
63 : /**
64 : * \brief Resumes the reception of event builder tokens after a suspension
65 : */
66 0 : void resumeTokenReception() { reception_is_paused_ = false; }
67 :
68 : /**
69 : * \brief Stops the reception of event builder tokens
70 : * \param force Whether to suppress any error messages (used if called from destructor)
71 : */
72 : void stopTokenReception(bool force = false);
73 :
74 : /**
75 : * \brief Specifies a StatisticsHelper instance to use when gathering statistics
76 : * \param helper A shared pointer to the StatisticsHelper instance
77 : * \param stat_key Name to use for gathering statistics on tokens received
78 : */
79 0 : void setStatsHelper(std::shared_ptr<StatisticsHelper> const& helper, std::string const& stat_key)
80 : {
81 0 : statsHelperPtr_ = helper;
82 0 : tokens_received_stat_key_ = stat_key;
83 0 : }
84 :
85 : /**
86 : * \brief Sets the current run number
87 : * \param run The current run number
88 : */
89 0 : void setRunNumber(uint32_t run) { run_number_ = run; }
90 :
91 : /**
92 : * \brief Returns the number of tokens that have been received
93 : * \return The number of tokens that have been received since the most recent start
94 : */
95 0 : size_t getReceivedTokenCount() const { return received_token_count_; }
96 :
97 : private:
98 : TokenReceiver(TokenReceiver const&) = delete;
99 : TokenReceiver(TokenReceiver&&) = delete;
100 : TokenReceiver& operator=(TokenReceiver const&) = delete;
101 : TokenReceiver& operator=(TokenReceiver&&) = delete;
102 :
103 : void receiveTokensLoop_();
104 :
105 : int token_port_;
106 : std::shared_ptr<RoutingManagerPolicy> policy_;
107 : size_t update_interval_msec_;
108 :
109 : int token_socket_{-1};
110 : std::vector<epoll_event> receive_token_events_;
111 : std::unordered_map<int, std::string> receive_token_addrs_;
112 : int token_epoll_fd_{-1};
113 :
114 : boost::thread token_thread_;
115 : std::atomic<bool> thread_is_running_;
116 : std::atomic<bool> reception_is_paused_;
117 : std::atomic<bool> shutdown_requested_;
118 : std::atomic<uint32_t> run_number_;
119 :
120 : std::atomic<size_t> received_token_count_;
121 : std::unordered_map<int, size_t> received_token_counter_;
122 : std::shared_ptr<StatisticsHelper> statsHelperPtr_;
123 : std::string tokens_received_stat_key_;
124 : };
125 : } // namespace artdaq
126 :
127 : #endif // ARTDAQ_DAQRATE_TOKEN_RECEIVER_HH
|