Line data Source code
1 : #ifndef artdaq_Application_Routing_RoutingManagerPolicy_hh
2 : #define artdaq_Application_Routing_RoutingManagerPolicy_hh
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq/DAQrate/detail/RoutingPacket.hh" // No library dependence.
8 :
9 : namespace fhicl {
10 : class ParameterSet;
11 : }
12 :
13 : #include <deque>
14 : #include <mutex>
15 : #include <unordered_set>
16 :
17 : namespace artdaq {
18 : /**
19 : * \brief The interface through which RoutingManagerCore obtains Routing Tables using received Routing Tokens
20 : */
21 : class RoutingManagerPolicy
22 : {
23 : public:
24 : /**
25 : * \brief RoutingManagerPolicy Constructor
26 : * \param ps ParameterSet used to configure the RoutingManagerPolicy
27 : *
28 : * \verbatim
29 : * RoutingManagerPolicy accepts the following Parameters:
30 : * \endverbatim
31 : */
32 : explicit RoutingManagerPolicy(const fhicl::ParameterSet& ps);
33 :
34 : /**
35 : * \brief Default virtual Destructor
36 : */
37 0 : virtual ~RoutingManagerPolicy() = default;
38 :
39 : /**
40 : * \brief Get the number of configured receivers
41 : * \return The size of the receiver_ranks list
42 : */
43 18 : size_t GetReceiverCount() const { return receiver_ranks_.size(); }
44 :
45 : /**
46 : * \brief Get the largest number of tokens that the RoutingManagerPolicy has seen at any one time
47 : * \return The largest number of tokens that the RoutingManagerPolicy has seen at any one time
48 : */
49 0 : size_t GetMaxNumberOfTokens() const { return max_token_count_; }
50 :
51 : /**
52 : * @brief Get the number of tokens that have been used since the last update
53 : * @return Current value of the token counter
54 : */
55 0 : size_t GetTokensUsedSinceLastUpdate() const { return tokens_used_since_last_update_; }
56 : /**
57 : * @brief Reset the number of tokens used
58 : */
59 5 : void ResetTokensUsedSinceLastUpdate() { tokens_used_since_last_update_ = 0; }
60 :
61 : /**
62 : * \brief Add a token to the token list
63 : * \param rank Rank that the token is from
64 : * \param new_slots_free Number of slots that are now free (should usually be 1)
65 : */
66 : void AddReceiverToken(int rank, unsigned new_slots_free);
67 :
68 : /**
69 : * \brief Reset the policy, setting the next sequence ID to be used to 1, and removing any tokens
70 : */
71 : void Reset();
72 :
73 : /**
74 : * @brief Get the next sequence ID to be routed
75 : * @return Next sequence ID to be routed
76 : */
77 0 : Fragment::sequence_id_t GetNextSequenceID() const { return next_sequence_id_; }
78 :
79 : /**
80 : * @brief Get the number of tokens that are waiting to be used
81 : * @return size of the held tokens list
82 : */
83 0 : size_t GetHeldTokenCount() const
84 : {
85 0 : std::unique_lock<std::mutex> lk(tokens_mutex_);
86 0 : return tokens_.size();
87 0 : }
88 :
89 : /**
90 : * @brief Create a RoutingPacket from currently-owned tokens. Used by EventBuilder and RequestBasedEventBuilder RoutingManagerMode
91 : * @return artdaq::detail::RoutingPacket created from tokens held by Routing Manager.
92 : */
93 : detail::RoutingPacket GetCurrentTable();
94 :
95 : /**
96 : * @brief Get an artdaq::detail::RoutingPacketEntry for a given sequence ID and rank. Used by RequestBasedEventBuilder and DataFlow RoutingManagerMode
97 : * @param seq Sequence Number to get route for
98 : * @param requesting_rank Rank to route for
99 : * @return artdaq::detail::RoutingPacketEntry connecting sequence ID to destination rank
100 : */
101 : detail::RoutingPacketEntry GetRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank);
102 :
103 : /**
104 : * @brief Get the current RoutingManagerMode of this RoutingManager
105 : * @return artdaq::detail::RoutingManagerMode value
106 : */
107 0 : detail::RoutingManagerMode GetRoutingMode() const { return routing_mode_; }
108 :
109 : // For tests
110 : /**
111 : * @brief Get the size of the routing cache. For testing
112 : * @return Size of the routing cache
113 : */
114 4 : size_t GetCacheSize() const { return routing_cache_.size(); }
115 : /**
116 : * @brief Determine whether the routing cache has a route for the given sequence ID. For testing
117 : * @param seq Sequence ID to check
118 : * @return True if the sequence ID is in the cache
119 : */
120 8 : bool CacheHasRoute(artdaq::Fragment::sequence_id_t seq) const { return routing_cache_.count(seq) != 0; }
121 :
122 : protected:
123 : /**
124 : * \brief Generate entries to add to the given table
125 : * \param tables The RoutingPacket to add entries to
126 : */
127 : virtual void CreateRoutingTable(detail::RoutingPacket& tables) = 0;
128 :
129 : /**
130 : * @brief Generate a route for the given sequence ID and source rank
131 : * @param seq Sequence ID to route
132 : * @param requesting_rank Source rank requesting routing information
133 : * @return An artdaq::detail::RoutingPacketEntry linking the sequence ID to a destination rank
134 : */
135 : virtual detail::RoutingPacketEntry CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank) = 0;
136 :
137 : // Tokens
138 : std::deque<int> tokens_; ///< The list of tokens which are available for use
139 : std::atomic<size_t> tokens_used_since_last_update_; ///< Number of tokens consumed since last metric update
140 :
141 : // Routing Information
142 : Fragment::sequence_id_t next_sequence_id_; ///< The next sequence ID to be assigned
143 : std::unordered_set<int> receiver_ranks_; ///< Configured receiver (e.g. EventBuilder for BR->EB routing) ranks
144 : detail::RoutingManagerMode routing_mode_; ///< Current routing mode
145 :
146 : private:
147 : RoutingManagerPolicy(RoutingManagerPolicy const&) = delete;
148 : RoutingManagerPolicy(RoutingManagerPolicy&&) = delete;
149 : RoutingManagerPolicy& operator=(RoutingManagerPolicy const&) = delete;
150 : RoutingManagerPolicy& operator=(RoutingManagerPolicy&&) = delete;
151 :
152 : void CreateRoutingTableFromCache(detail::RoutingPacket& table); ///< Cache is used when in EventBuilding modes only, otherwise just creates RoutingTables structure
153 :
154 : void TrimRoutingCache();
155 : void UpdateCache(detail::RoutingPacket& table);
156 :
157 : struct RoutingCacheEntry
158 : {
159 : bool is_valid{false};
160 : int destination_rank{-1};
161 : Fragment::sequence_id_t sequence_id{artdaq::Fragment::InvalidSequenceID};
162 : int requesting_rank{-1};
163 : bool included_in_table{false};
164 :
165 : RoutingCacheEntry() {}
166 173 : RoutingCacheEntry(Fragment::sequence_id_t seq, int dest, int source)
167 173 : : is_valid(true), destination_rank(dest), sequence_id(seq), requesting_rank(source) {}
168 : };
169 : std::map<Fragment::sequence_id_t, std::vector<RoutingCacheEntry>> routing_cache_;
170 : size_t routing_cache_max_size_;
171 : mutable std::mutex routing_cache_mutex_;
172 : std::atomic<size_t> max_token_count_;
173 : mutable std::mutex tokens_mutex_;
174 : };
175 : } // namespace artdaq
176 :
177 : #endif // artdaq_Application_Routing_RoutingManagerPolicy_hh
|