Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh"
2 : #define TRACE_NAME (app_name + "_RoundRobin_policy").c_str()
3 : #include "TRACE/tracemf.h"
4 :
5 : #include "artdaq/RoutingPolicies/PolicyMacros.hh"
6 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
7 :
8 : #include "fhiclcpp/ParameterSet.h"
9 :
10 : namespace artdaq {
11 : /**
12 : * \brief A RoutingManagerPolicy which evenly distributes Sequence IDs to all receivers.
13 : * If an uneven number of tokens have been received, extra tokens are stored for the next table update.
14 : */
15 : class RoundRobinPolicy : public RoutingManagerPolicy
16 : {
17 : public:
18 : /**
19 : * \brief RoundRobinPolicy Constructor
20 : * \param ps ParameterSet used to configure RoundRobinPolicy
21 : *
22 : * RoundRobinPolicy accepts the following Parameter:
23 : * "minimum_participants" (Default: 0): Minimum number of receivers to distribute between. Use negative number to indicate how many can be missing from total. If the number of allowed missing receivers is greater than the number that exist, then the minimum number of participants will be set to 1.
24 : */
25 0 : explicit RoundRobinPolicy(const fhicl::ParameterSet& ps)
26 0 : : RoutingManagerPolicy(ps)
27 0 : , minimum_participants_(ps.get<int>("minimum_participants", 0))
28 : {
29 0 : }
30 :
31 : /**
32 : * \brief Default virtual Destructor
33 : */
34 0 : ~RoundRobinPolicy() override = default;
35 :
36 : /**
37 : * @brief Add entries to the given RoutingPacket using currently-held tokens
38 : * @param output RoutingPacket to add entries to
39 : *
40 : * RoundRobinPolicy will go through the list of receivers as many times
41 : * as it can, until one or more receivers have no tokens. It always does full
42 : * "turns" through the recevier list.
43 : */
44 : void CreateRoutingTable(detail::RoutingPacket& output) override;
45 : /**
46 : * @brief Get an artdaq::detail::RoutingPacketEntry for a given sequence ID and rank. Used by RequestBasedEventBuilder and DataFlow RoutingManagerMode
47 : * @param seq Sequence Number to get route for
48 : * @param requesting_rank Rank to route for
49 : * @return artdaq::detail::RoutingPacketEntry connecting sequence ID to destination rank
50 : */
51 : detail::RoutingPacketEntry CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank) override;
52 :
53 : private:
54 : RoundRobinPolicy(RoundRobinPolicy const&) = delete;
55 : RoundRobinPolicy(RoundRobinPolicy&&) = delete;
56 : RoundRobinPolicy& operator=(RoundRobinPolicy const&) = delete;
57 : RoundRobinPolicy& operator=(RoundRobinPolicy&&) = delete;
58 :
59 : std::map<int, int> sortTokens_();
60 : void restoreUnusedTokens_(std::map<int, int> const& sorted_tokens_);
61 : int calculateMinimum_();
62 :
63 : int minimum_participants_;
64 : std::set<int> receivers_in_current_round_;
65 : };
66 :
67 0 : void RoundRobinPolicy::CreateRoutingTable(detail::RoutingPacket& output)
68 : {
69 0 : TLOG(TLVL_DEBUG + 35) << "RoundRobinPolicy::GetCurrentTable token list size is " << tokens_.size();
70 0 : auto table = sortTokens_();
71 0 : TLOG(TLVL_DEBUG + 36) << "RoundRobinPolicy::GetCurrentTable table size is " << table.size();
72 :
73 0 : int minimum = calculateMinimum_();
74 0 : bool endCondition = table.size() < static_cast<size_t>(minimum);
75 0 : TLOG(TLVL_DEBUG + 37) << "RoundRobinPolicy::GetCurrentTable initial endCondition is " << endCondition << ", minimum is " << minimum;
76 :
77 0 : while (!endCondition)
78 : {
79 0 : for (auto it = table.begin(); it != table.end();)
80 : {
81 0 : TLOG(TLVL_DEBUG + 38) << "RoundRobinPolicy::GetCurrentTable assigning sequenceID " << next_sequence_id_ << " to rank " << it->first;
82 0 : output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, it->first));
83 0 : table[it->first]--;
84 :
85 0 : if (table[it->first] <= 0)
86 : {
87 0 : it = table.erase(it);
88 : }
89 : else
90 : {
91 0 : ++it;
92 : }
93 : }
94 0 : endCondition = table.size() < static_cast<size_t>(minimum);
95 : }
96 :
97 0 : restoreUnusedTokens_(table);
98 0 : TLOG(TLVL_DEBUG + 36) << "RoundRobinPolicy::GetCurrentTable " << tokens_.size() << " unused tokens will be saved for later";
99 :
100 0 : TLOG(TLVL_DEBUG + 35) << "RoundRobinPolicy::GetCurrentTable return with table size " << output.size();
101 0 : }
102 0 : detail::RoutingPacketEntry RoundRobinPolicy::CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int)
103 : {
104 0 : detail::RoutingPacketEntry output;
105 :
106 : // Trivial case: We've already started a round
107 0 : if (!receivers_in_current_round_.empty())
108 : {
109 0 : output = detail::RoutingPacketEntry(seq, *receivers_in_current_round_.begin());
110 0 : receivers_in_current_round_.erase(receivers_in_current_round_.begin());
111 0 : return output;
112 : }
113 :
114 : // We need to set up the next round...
115 0 : auto table = sortTokens_();
116 :
117 : // Can't route anything until a full "turn" is available
118 0 : int minimum = calculateMinimum_();
119 0 : if (table.size() < static_cast<size_t>(minimum))
120 : {
121 0 : TLOG(TLVL_WARNING) << "Do not have tokens from a minimum set of receivers to start a round";
122 : }
123 : else
124 : {
125 0 : for (auto& entry : table)
126 : {
127 0 : receivers_in_current_round_.insert(entry.first);
128 0 : entry.second--;
129 : }
130 0 : output = detail::RoutingPacketEntry(seq, *receivers_in_current_round_.begin());
131 0 : receivers_in_current_round_.erase(receivers_in_current_round_.begin());
132 : }
133 :
134 0 : restoreUnusedTokens_(table);
135 0 : return output;
136 0 : }
137 0 : std::map<int, int> RoundRobinPolicy::sortTokens_()
138 : {
139 0 : auto output = std::map<int, int>();
140 0 : for (auto token : tokens_)
141 : {
142 0 : output[token]++;
143 : }
144 0 : tokens_.clear();
145 0 : return output;
146 0 : }
147 :
148 0 : void RoundRobinPolicy::restoreUnusedTokens_(std::map<int, int> const& sorted_tokens)
149 : {
150 0 : for (auto r : sorted_tokens)
151 : {
152 0 : for (auto i = 0; i < r.second; ++i)
153 : {
154 0 : tokens_.push_back(r.first);
155 : }
156 : }
157 0 : }
158 :
159 0 : int RoundRobinPolicy::calculateMinimum_()
160 : {
161 : // If 0 or negative, add minimum_participants_ to GetRecevierCount to ensure that it's correct
162 : // 02-Apr-2019, KAB: changed the declared type of "minimum" from 'auto' to 'int' to avoid the
163 : // situation in which the compiler chooses a type of 'unsigned int', the minimum_participants_ is
164 : // a negative number that is larger (in absolute value) to the receiver count, and "minimum"
165 : // ends up with a large positive value.
166 0 : int minimum = minimum_participants_ > 0 ? minimum_participants_ : GetReceiverCount() + minimum_participants_;
167 0 : if (minimum < 1)
168 : {
169 0 : minimum = 1; // Can't go below 1
170 : }
171 0 : return minimum;
172 : }
173 :
174 : } // namespace artdaq
175 :
176 0 : DEFINE_ARTDAQ_ROUTING_POLICY(artdaq::RoundRobinPolicy)
|