Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_PreferSameHost_policy").c_str()
3 : #include "TRACE/tracemf.h"
4 :
5 : #include "artdaq/DAQdata/HostMap.hh"
6 : #include "artdaq/RoutingPolicies/PolicyMacros.hh"
7 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
8 :
9 : #include "fhiclcpp/ParameterSet.h"
10 :
11 : #include <map>
12 :
13 : namespace artdaq {
14 : /**
15 : * \brief A RoutingManagerPolicy which tries to keep data on the same host. For EventBuilding mode, performs RoundRobin.
16 : */
17 : class PreferSameHostPolicy : public RoutingManagerPolicy
18 : {
19 : public:
20 : /**
21 : * \brief PreferSameHostPolicy Constructor
22 : * \param ps ParameterSet used to configure PreferSameHostPolicy
23 : *
24 : * PreferSameHostPolicy accepts the following Parameter:
25 : * "minimum_participants" (Default: 0): Minimum number of receivers to distribute between. Use negative number to indicate how many can be missing from total. If the number of allowed missing receivers is greater than the number that exist, then the minimum number of participants will be set to 1.
26 : */
27 0 : explicit PreferSameHostPolicy(const fhicl::ParameterSet& ps)
28 0 : : RoutingManagerPolicy(ps)
29 0 : , minimum_participants_(ps.get<int>("minimum_participants", 0))
30 0 : , host_map_(MakeHostMap(ps))
31 : {
32 0 : }
33 :
34 : /**
35 : * \brief Default virtual Destructor
36 : */
37 0 : ~PreferSameHostPolicy() override = default;
38 :
39 : /**
40 : * \brief Generate a set of Routing Tables using received tokens
41 : * \param output The RoutingPacket to add entries to
42 : *
43 : * PreferSameHostPolicy will go through the list of receivers as many times
44 : * as it can, until one or more receivers have no tokens. It always does full
45 : * "turns" through the recevier list.
46 : */
47 : void CreateRoutingTable(detail::RoutingPacket& output) override;
48 : /**
49 : * @brief Get an artdaq::detail::RoutingPacketEntry for a given sequence ID and rank. Used by RequestBasedEventBuilder and DataFlow RoutingManagerMode
50 : * @param seq Sequence Number to get route for
51 : * @param requesting_rank Rank to route for
52 : * @return artdaq::detail::RoutingPacketEntry connecting sequence ID to destination rank
53 : */
54 : detail::RoutingPacketEntry CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank) override;
55 :
56 : private:
57 : PreferSameHostPolicy(PreferSameHostPolicy const&) = delete;
58 : PreferSameHostPolicy(PreferSameHostPolicy&&) = delete;
59 : PreferSameHostPolicy& operator=(PreferSameHostPolicy const&) = delete;
60 : PreferSameHostPolicy& operator=(PreferSameHostPolicy&&) = delete;
61 :
62 : std::map<int, int> sortTokens_();
63 : void restoreUnusedTokens_(std::map<int, int> const& sorted_tokens_);
64 : int calculateMinimum_();
65 :
66 : int minimum_participants_;
67 : hostMap_t host_map_;
68 : };
69 :
70 0 : void PreferSameHostPolicy::CreateRoutingTable(detail::RoutingPacket& output)
71 : {
72 0 : TLOG(TLVL_DEBUG + 35) << "PreferSameHostPolicy::GetCurrentTable token list size is " << tokens_.size();
73 0 : auto table = sortTokens_();
74 0 : TLOG(TLVL_DEBUG + 36) << "PreferSameHostPolicy::GetCurrentTable table size is " << table.size();
75 :
76 0 : int minimum = calculateMinimum_();
77 0 : bool endCondition = table.size() < static_cast<size_t>(minimum);
78 0 : TLOG(TLVL_DEBUG + 37) << "PreferSameHostPolicy::GetCurrentTable initial endCondition is " << endCondition << ", minimum is " << minimum;
79 :
80 0 : while (!endCondition)
81 : {
82 0 : for (auto it = table.begin(); it != table.end();)
83 : {
84 0 : TLOG(TLVL_DEBUG + 38) << "PreferSameHostPolicy::GetCurrentTable assigning sequenceID " << next_sequence_id_ << " to rank " << it->first;
85 0 : output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, it->first));
86 0 : table[it->first]--;
87 :
88 0 : if (table[it->first] <= 0)
89 : {
90 0 : it = table.erase(it);
91 : }
92 : else
93 : {
94 0 : ++it;
95 : }
96 : }
97 0 : endCondition = table.size() < static_cast<size_t>(minimum);
98 : }
99 :
100 0 : restoreUnusedTokens_(table);
101 0 : TLOG(TLVL_DEBUG + 36) << "PreferSameHostPolicy::GetCurrentTable " << tokens_.size() << " unused tokens will be saved for later";
102 :
103 0 : TLOG(TLVL_DEBUG + 35) << "PreferSameHostPolicy::GetCurrentTable return with table size " << output.size();
104 0 : }
105 0 : detail::RoutingPacketEntry PreferSameHostPolicy::CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank)
106 : {
107 0 : detail::RoutingPacketEntry output;
108 0 : auto table = sortTokens_();
109 :
110 : // Trivial case: no tokens
111 0 : if (table.empty()) return output;
112 :
113 0 : if (host_map_.count(requesting_rank) == 0)
114 : {
115 0 : TLOG(TLVL_WARNING) << "Received Routing Request from rank " << requesting_rank << ", which is not in my Host Map!";
116 : }
117 0 : auto host = host_map_[requesting_rank];
118 :
119 : // First try to find a match
120 0 : std::set<int> matching_ranks_;
121 0 : int max_rank = -1;
122 0 : int max_rank_tokens = 0;
123 0 : for (auto& entry : table)
124 : {
125 0 : if (entry.second == 0) continue;
126 0 : if (host_map_.count(entry.first) == 0)
127 : {
128 0 : TLOG(TLVL_WARNING) << "Receiver rank " << entry.first << " is not in the host map! Is this policy configured correctly?!";
129 : }
130 : else
131 : {
132 0 : if (host_map_[entry.first] == host)
133 : {
134 0 : matching_ranks_.insert(entry.first);
135 : }
136 : }
137 0 : if (entry.second > max_rank_tokens)
138 : {
139 0 : max_rank = entry.first;
140 0 : max_rank_tokens = entry.second;
141 : }
142 : }
143 :
144 0 : if (matching_ranks_.size() == 0)
145 : {
146 0 : output = detail::RoutingPacketEntry(seq, max_rank);
147 0 : table[max_rank]--;
148 : }
149 0 : else if (matching_ranks_.size() == 1)
150 : {
151 0 : output = detail::RoutingPacketEntry(seq, *matching_ranks_.begin());
152 0 : table[*matching_ranks_.begin()]--;
153 : }
154 : else
155 : {
156 : // Find the most tokens in matching_ranks_
157 0 : int max = 0;
158 0 : int max_rank = -1;
159 0 : for (auto& rank : matching_ranks_)
160 : {
161 0 : if (table[rank] > max)
162 : {
163 0 : max = table[rank];
164 0 : max_rank = rank;
165 : }
166 : }
167 0 : output = detail::RoutingPacketEntry(seq, max_rank);
168 0 : table[max_rank]--;
169 : }
170 :
171 0 : restoreUnusedTokens_(table);
172 0 : return output;
173 0 : }
174 0 : std::map<int, int> PreferSameHostPolicy::sortTokens_()
175 : {
176 0 : auto output = std::map<int, int>();
177 0 : for (auto token : tokens_)
178 : {
179 0 : output[token]++;
180 : }
181 0 : tokens_.clear();
182 0 : return output;
183 0 : }
184 :
185 0 : void PreferSameHostPolicy::restoreUnusedTokens_(std::map<int, int> const& sorted_tokens)
186 : {
187 0 : for (auto r : sorted_tokens)
188 : {
189 0 : for (auto i = 0; i < r.second; ++i)
190 : {
191 0 : tokens_.push_back(r.first);
192 : }
193 : }
194 0 : }
195 :
196 0 : int PreferSameHostPolicy::calculateMinimum_()
197 : {
198 : // If 0 or negative, add minimum_participants_ to GetRecevierCount to ensure that it's correct
199 : // 02-Apr-2019, KAB: changed the declared type of "minimum" from 'auto' to 'int' to avoid the
200 : // situation in which the compiler chooses a type of 'unsigned int', the minimum_participants_ is
201 : // a negative number that is larger (in absolute value) to the receiver count, and "minimum"
202 : // ends up with a large positive value.
203 0 : int minimum = minimum_participants_ > 0 ? minimum_participants_ : GetReceiverCount() + minimum_participants_;
204 0 : if (minimum < 1)
205 : {
206 0 : minimum = 1; // Can't go below 1
207 : }
208 0 : return minimum;
209 : }
210 :
211 : } // namespace artdaq
212 :
213 0 : DEFINE_ARTDAQ_ROUTING_POLICY(artdaq::PreferSameHostPolicy)
|