Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
2 : #define ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : namespace fhicl {
8 : class ParameterSet;
9 : }
10 :
11 : #include "fhiclcpp/types/Atom.h"
12 : #include "fhiclcpp/types/Comment.h"
13 : #include "fhiclcpp/types/ConfigurationTable.h"
14 : #include "fhiclcpp/types/Name.h"
15 : #include "fhiclcpp/types/OptionalTable.h"
16 : #include "fhiclcpp/types/TableFragment.h"
17 :
18 : #include "boost/thread.hpp"
19 :
20 : #include <netinet/in.h>
21 : #include <condition_variable>
22 : #include <map>
23 : #include <memory>
24 : #include <set>
25 :
26 : namespace artdaq {
27 : class TableReceiver;
28 : }
29 :
30 : /**
31 : * \brief Sends Fragment objects using TransferInterface plugins. Uses Routing Tables if confgiured,
32 : * otherwise will Round-Robin Fragments to the destinations.
33 : */
34 : class artdaq::TableReceiver
35 : {
36 : public:
37 : /// <summary>
38 : /// Configuration for Routing table reception
39 : ///
40 : /// This configuration should be the same for all processes receiving routing tables from a given RoutingManager.
41 : /// </summary>
42 : struct Config
43 : {
44 : /// "use_routing_manager" (Default: false): True if using the Routing Manager
45 : fhicl::Atom<bool> use_routing_manager{fhicl::Name{"use_routing_manager"}, fhicl::Comment{"True if using the Routing Manager"}, false};
46 : /// "route_on_request_mode" (Default: false): True if a request for routing information should be sent to the RoutingManager (versus RoutingManager pushing table updates).
47 : fhicl::Atom<bool> route_on_request_mode{fhicl::Name{"route_on_request_mode"}, fhicl::Comment{"True if a request for routing information should be sent to the RoutingManager (versus RoutingManager pushing table updates)."}, false};
48 : /// "use_routing_table_thread" (Default: true): True if a thread should be run to receive routing updates. Required if route_on_request_mode is false.
49 : fhicl::Atom<bool> use_routing_table_thread{fhicl::Name{"use_routing_table_thread"}, fhicl::Comment{"True if a thread should be run to receive routing updates. Required if route_on_request_mode is false."}, true};
50 : /// "table_update_port" (Default: 35556): Port to connect to for receiving table updates
51 : fhicl::Atom<int> table_port{fhicl::Name{"table_update_port"}, fhicl::Comment{"Port to connect to for receiving table updates"}, 35556};
52 : /// "routing_manager_hostname" (Default: "localhost"): RoutingManager hostname for Table connection
53 : fhicl::Atom<std::string> routing_manager_hostname{fhicl::Name{"routing_manager_hostname"}, fhicl::Comment{"outingManager hostname for Table connection"}, "localhost"};
54 : /// "routing_timeout_ms" (Default: 1000): Time to wait for a routing table update
55 : fhicl::Atom<int> routing_timeout_ms{fhicl::Name{"routing_timeout_ms"}, fhicl::Comment{"Time to wait (in ms) for a routing table update"}, 1000};
56 : /// "routing_table_max_size" (Default: 1000): Maximum number of entries in the routing table
57 : fhicl::Atom<size_t> routing_table_max_size{fhicl::Name{"routing_table_max_size"}, fhicl::Comment{"Maximum number of entries in the routing table"}, 1000};
58 : };
59 : /// Used for ParameterSet validation (if desired)
60 : using Parameters = fhicl::WrappedTable<Config>;
61 :
62 : using RoutingTable = std::map<artdaq::Fragment::sequence_id_t, int>; ///< Internal representation of a routing table, relating a sequence ID to a destination rank
63 :
64 : static constexpr int ROUTING_FAILED = -1111; ///< Value used to indicate that a route was not properly generated
65 :
66 : /**
67 : * \brief TableReceiver Constructor
68 : * \param ps ParameterSet used to configure the TableReceiver. See artdaq::TableReceiver::Config
69 : */
70 : explicit TableReceiver(const fhicl::ParameterSet& ps);
71 :
72 : /**
73 : * \brief TableReceiver Destructor
74 : */
75 : virtual ~TableReceiver();
76 :
77 : /**
78 : * @brief Get a copy of the current RoutingTable
79 : */
80 : RoutingTable GetRoutingTable() const;
81 :
82 : /**
83 : * @brief Get the current RoutingTable and remove all entries
84 : */
85 : RoutingTable GetAndClearRoutingTable();
86 :
87 : /**
88 : * @brief Get the destination rank for the given sequence ID
89 : * @param seqID Sequence ID to query
90 : * @return Destination rank for given Sequence ID
91 : */
92 : int GetRoutingTableEntry(artdaq::Fragment::sequence_id_t seqID);
93 :
94 : /**
95 : * \brief Gets the current size of the Routing Table, in case other parts of the system want to use this information
96 : * \return The current size of the Routing Table.
97 : */
98 : size_t GetRoutingTableEntryCount() const;
99 :
100 : /**
101 : * \brief Gets the number of sends remaining in the routing table, in case other parts of the system want to use this information
102 : * \return The number of sends remaining in the routing table
103 : */
104 : size_t GetRemainingRoutingTableEntries() const;
105 :
106 : /**
107 : * \brief Stop the TableReceiver
108 : */
109 : void StopTableReceiver() { should_stop_ = true; }
110 :
111 : /**
112 : * \brief Remove the given sequence ID from the routing table and sent_count lists
113 : * \param seq Sequence ID to remove
114 : */
115 : void RemoveRoutingTableEntry(Fragment::sequence_id_t seq);
116 :
117 : /**
118 : * @brief Report metrics to MetricManager
119 : */
120 : void SendMetrics() const;
121 :
122 : /**
123 : * @brief Whether the TableReceiver will receive tables from the RoutingManager
124 : */
125 0 : bool RoutingManagerEnabled() const { return use_routing_manager_; }
126 :
127 : private:
128 : TableReceiver(TableReceiver const&) = delete;
129 : TableReceiver(TableReceiver&&) = delete;
130 : TableReceiver& operator=(TableReceiver const&) = delete;
131 : TableReceiver& operator=(TableReceiver&&) = delete;
132 :
133 : void connectToRoutingManager_();
134 : void disconnectFromRoutingManager_();
135 :
136 : void startTableReceiverThread_();
137 :
138 : bool receiveTableUpdate_();
139 : void receiveTableUpdatesLoop_();
140 :
141 : void sendTableUpdateRequest_(Fragment::sequence_id_t seq);
142 :
143 : private:
144 : bool use_routing_manager_;
145 : std::atomic<bool> should_stop_;
146 : int table_port_;
147 : std::string table_address_;
148 : int table_socket_;
149 : RoutingTable routing_table_;
150 : Fragment::sequence_id_t routing_table_last_;
151 : size_t routing_table_max_size_;
152 : mutable std::mutex routing_mutex_;
153 : std::unique_ptr<boost::thread> routing_thread_;
154 : mutable std::atomic<size_t> routing_wait_time_;
155 : mutable std::atomic<size_t> routing_wait_time_count_;
156 : mutable std::condition_variable routing_cv_;
157 :
158 : size_t routing_timeout_ms_;
159 :
160 : mutable std::atomic<uint64_t> highest_sequence_id_routed_;
161 : };
162 :
163 : #endif // ARTDAQ_DAQRATE_DETAIL_TABLERECEIVER_HH
|