Line data Source code
1 : #include <cmath>
2 : #include "artdaq/RoutingPolicies/PolicyMacros.hh"
3 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
4 : #include "fhiclcpp/ParameterSet.h"
5 :
6 : namespace artdaq {
7 : /**
8 : * \brief A RoutingManagerPolicy which tries to fully load the first receiver, then the second, and so on
9 : */
10 : class CapacityTestPolicy : public RoutingManagerPolicy
11 : {
12 : public:
13 : /**
14 : * \brief CapacityTestPolicy Constructor
15 : * \param ps ParameterSet used to configure the CapacityTestPolicy
16 : *
17 : * \verbatim
18 : * CapacityTestPolicy accepts the following Parameters:
19 : * "tokens_used_per_table_percent" (Default: 50): Percentage of available tokens to be used on each iteration.
20 : * \endverbatim
21 : */
22 : explicit CapacityTestPolicy(const fhicl::ParameterSet& ps);
23 :
24 : /**
25 : * \brief Default virtual Destructor
26 : */
27 0 : ~CapacityTestPolicy() override = default;
28 :
29 : /**
30 : * @brief Add entries to the given RoutingPacket using currently-held tokens
31 : * @param output RoutingPacket to add entries to
32 : *
33 : * CapacityTestPolicy will assign available tokens from the first receiver, then the second, and so on
34 : * until it has assigned tokens equal to the inital_token_count * tokens_used_per_table_percent / 100.
35 : * The idea is that in steady-state, the load on the receivers should reflect the workload relative to
36 : * the capacity of the system. (i.e. if you have 5 receivers, and 3 of them are 100% busy, then your load
37 : * factor is approximately 60%.)
38 : */
39 : virtual void CreateRoutingTable(detail::RoutingPacket& output) override;
40 : /**
41 : * @brief Get an artdaq::detail::RoutingPacketEntry for a given sequence ID and rank. Used by RequestBasedEventBuilder and DataFlow RoutingManagerMode
42 : * @param seq Sequence Number to get route for
43 : * @param requesting_rank Rank to route for
44 : * @return artdaq::detail::RoutingPacketEntry connecting sequence ID to destination rank
45 : */
46 : virtual detail::RoutingPacketEntry CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank) override;
47 :
48 : private:
49 : std::pair<size_t, std::map<int, int>> sortTokens_();
50 : void restoreUnusedTokens_(std::map<int, int> const& sorted_tokens);
51 :
52 : CapacityTestPolicy(CapacityTestPolicy const&) = delete;
53 : CapacityTestPolicy(CapacityTestPolicy&&) = delete;
54 : CapacityTestPolicy& operator=(CapacityTestPolicy const&) = delete;
55 : CapacityTestPolicy& operator=(CapacityTestPolicy&&) = delete;
56 :
57 : int tokenUsagePercent_;
58 : };
59 :
60 0 : CapacityTestPolicy::CapacityTestPolicy(const fhicl::ParameterSet& ps)
61 : : RoutingManagerPolicy(ps)
62 0 : , tokenUsagePercent_(ps.get<int>("tokens_used_per_table_percent", 50))
63 0 : {}
64 :
65 0 : void CapacityTestPolicy::CreateRoutingTable(detail::RoutingPacket& output)
66 : {
67 0 : auto sorted_tokens = sortTokens_();
68 0 : size_t tokenCount = sorted_tokens.first;
69 0 : auto table = sorted_tokens.second;
70 0 : size_t tokensToUse = ceil(tokenCount * tokenUsagePercent_ / 100.0);
71 :
72 0 : for (auto r : table)
73 : {
74 0 : bool breakCondition = false;
75 0 : while (table[r.first] > 0)
76 : {
77 0 : output.emplace_back(detail::RoutingPacketEntry(next_sequence_id_++, r.first));
78 0 : table[r.first]--;
79 0 : tokens_used_since_last_update_++;
80 0 : if (tokens_used_since_last_update_ >= tokensToUse)
81 : {
82 0 : breakCondition = true;
83 0 : break;
84 : }
85 : }
86 0 : if (breakCondition)
87 : {
88 0 : break;
89 : }
90 : }
91 :
92 0 : restoreUnusedTokens_(table);
93 0 : }
94 :
95 0 : detail::RoutingPacketEntry CapacityTestPolicy::CreateRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int)
96 : {
97 : // TODO, ELF 09/22/2020: Do we want to use the tokens_used_per_table_percent limitation here, too?
98 0 : detail::RoutingPacketEntry output;
99 0 : auto sorted_tokens = sortTokens_();
100 0 : if (sorted_tokens.first == 0) return output; // Trivial case: no tokens
101 0 : auto dest = sorted_tokens.second.begin()->first;
102 0 : sorted_tokens.second[dest]--;
103 0 : tokens_used_since_last_update_++;
104 0 : restoreUnusedTokens_(sorted_tokens.second);
105 :
106 0 : output = detail::RoutingPacketEntry(seq, dest);
107 :
108 0 : return output;
109 0 : }
110 :
111 0 : std::pair<size_t, std::map<int, int>> CapacityTestPolicy::sortTokens_()
112 : {
113 0 : auto output = std::make_pair(0, std::map<int, int>());
114 0 : for (auto token : tokens_)
115 : {
116 0 : output.second[token]++;
117 0 : output.first++;
118 : }
119 0 : tokens_.clear();
120 0 : return output;
121 0 : }
122 :
123 0 : void CapacityTestPolicy::restoreUnusedTokens_(std::map<int, int> const& sorted_tokens)
124 : {
125 0 : for (auto r : sorted_tokens)
126 : {
127 0 : for (auto i = 0; i < r.second; ++i)
128 : {
129 0 : tokens_.push_back(r.first);
130 : }
131 : }
132 0 : }
133 :
134 : } // namespace artdaq
135 :
136 0 : DEFINE_ARTDAQ_ROUTING_POLICY(artdaq::CapacityTestPolicy)
|