Line data Source code
1 : #define TRACE_NAME "RoutingManagerPolicy"
2 : #include "TRACE/tracemf.h"
3 :
4 : #include "artdaq/DAQdata/Globals.hh"
5 : #include "artdaq/RoutingPolicies/RoutingManagerPolicy.hh"
6 :
7 : #include "fhiclcpp/ParameterSet.h"
8 :
9 21 : artdaq::RoutingManagerPolicy::RoutingManagerPolicy(const fhicl::ParameterSet& ps)
10 21 : : tokens_used_since_last_update_(0)
11 21 : , next_sequence_id_(1)
12 42 : , max_token_count_(0)
13 : {
14 63 : routing_mode_ = detail::RoutingManagerModeConverter::stringToRoutingManagerMode(ps.get<std::string>("routing_manager_mode", "EventBuilding"));
15 42 : routing_cache_max_size_ = ps.get<size_t>("routing_cache_size", 1000);
16 21 : }
17 :
18 48 : artdaq::detail::RoutingPacket artdaq::RoutingManagerPolicy::GetCurrentTable()
19 : {
20 48 : auto table = detail::RoutingPacket();
21 :
22 48 : if (routing_mode_ == detail::RoutingManagerMode::EventBuilding)
23 : {
24 36 : std::lock_guard<std::mutex> lk(tokens_mutex_);
25 36 : CreateRoutingTable(table);
26 36 : UpdateCache(table);
27 36 : CreateRoutingTableFromCache(table);
28 36 : }
29 48 : if (routing_mode_ == detail::RoutingManagerMode::RequestBasedEventBuilding)
30 : {
31 12 : CreateRoutingTableFromCache(table);
32 : }
33 :
34 48 : return table;
35 0 : }
36 :
37 138 : void artdaq::RoutingManagerPolicy::AddReceiverToken(int rank, unsigned new_slots_free)
38 : {
39 138 : if (receiver_ranks_.count(rank) == 0u)
40 : {
41 189 : TLOG(TLVL_INFO) << "Adding rank " << rank << " to receivers list (initial tokens=" << new_slots_free << ")";
42 63 : receiver_ranks_.insert(rank);
43 : }
44 276 : TLOG(TLVL_DEBUG + 35) << "AddReceiverToken BEGIN";
45 138 : std::lock_guard<std::mutex> lk(tokens_mutex_);
46 138 : if (new_slots_free == 1)
47 : {
48 111 : tokens_.push_back(rank);
49 : }
50 : else
51 : {
52 : // Randomly distribute multitokens through the token list
53 : // Only used at start run time, so we can take the performance hit
54 105 : for (unsigned i = 0; i < new_slots_free; ++i)
55 : {
56 78 : auto it = tokens_.begin();
57 78 : if (!tokens_.empty())
58 : {
59 72 : std::advance(it, rand() % tokens_.size()); // NOLINT(cert-msc50-cpp)
60 : }
61 78 : tokens_.insert(it, rank);
62 : }
63 : }
64 138 : if (tokens_.size() > max_token_count_)
65 : {
66 91 : max_token_count_ = tokens_.size();
67 : }
68 276 : TLOG(TLVL_DEBUG + 35) << "AddReceiverToken END";
69 138 : }
70 :
71 18 : void artdaq::RoutingManagerPolicy::Reset()
72 : {
73 18 : next_sequence_id_ = 1;
74 18 : std::unique_lock<std::mutex> lk(tokens_mutex_);
75 18 : tokens_.clear();
76 18 : receiver_ranks_.clear();
77 18 : }
78 :
79 73 : artdaq::detail::RoutingPacketEntry artdaq::RoutingManagerPolicy::GetRouteForSequenceID(artdaq::Fragment::sequence_id_t seq, int requesting_rank)
80 : {
81 73 : if (routing_mode_ != detail::RoutingManagerMode::DataFlow)
82 : {
83 43 : std::lock_guard<std::mutex> lk(routing_cache_mutex_);
84 43 : if (routing_cache_.count(seq))
85 : {
86 16 : return detail::RoutingPacketEntry(seq, routing_cache_[seq][0].destination_rank);
87 : }
88 : else
89 : {
90 27 : std::lock_guard<std::mutex> tlk(tokens_mutex_);
91 27 : auto entry = CreateRouteForSequenceID(seq, requesting_rank);
92 27 : if (entry.sequence_id == seq)
93 : {
94 23 : routing_cache_[seq].emplace_back(seq, entry.destination_rank, requesting_rank);
95 : }
96 27 : return entry;
97 27 : }
98 43 : }
99 : else
100 : {
101 30 : std::lock_guard<std::mutex> lk(routing_cache_mutex_);
102 30 : if (routing_cache_.count(seq))
103 : {
104 40 : for (auto& entry : routing_cache_[seq])
105 : {
106 27 : if (entry.requesting_rank == requesting_rank)
107 : {
108 5 : return detail::RoutingPacketEntry(seq, entry.destination_rank);
109 : }
110 : }
111 : }
112 :
113 25 : std::lock_guard<std::mutex> tlk(tokens_mutex_);
114 25 : auto entry = CreateRouteForSequenceID(seq, requesting_rank);
115 25 : if (entry.sequence_id == seq)
116 : {
117 24 : routing_cache_[seq].emplace_back(seq, entry.destination_rank, requesting_rank);
118 : }
119 :
120 25 : TrimRoutingCache();
121 25 : return entry;
122 30 : }
123 : return detail::RoutingPacketEntry();
124 : }
125 :
126 37 : void artdaq::RoutingManagerPolicy::TrimRoutingCache()
127 : {
128 52 : while (routing_cache_.size() > routing_cache_max_size_)
129 : {
130 15 : routing_cache_.erase(routing_cache_.begin());
131 : }
132 37 : }
133 :
134 36 : void artdaq::RoutingManagerPolicy::UpdateCache(detail::RoutingPacket& table)
135 : {
136 36 : std::lock_guard<std::mutex> lk(routing_cache_mutex_);
137 :
138 163 : for (auto& entry : table)
139 : {
140 127 : if (!routing_cache_.count(entry.sequence_id))
141 : {
142 126 : routing_cache_[entry.sequence_id].emplace_back(entry.sequence_id, entry.destination_rank, my_rank);
143 : }
144 : }
145 36 : }
146 :
147 48 : void artdaq::RoutingManagerPolicy::CreateRoutingTableFromCache(detail::RoutingPacket& table)
148 : {
149 48 : std::lock_guard<std::mutex> lk(routing_cache_mutex_);
150 :
151 48 : if (routing_mode_ == detail::RoutingManagerMode::RequestBasedEventBuilding)
152 : {
153 47 : for (auto& cache_entry : routing_cache_)
154 : {
155 35 : if (!cache_entry.second[0].included_in_table)
156 : {
157 23 : table.push_back(artdaq::detail::RoutingPacketEntry(cache_entry.second[0].sequence_id, cache_entry.second[0].destination_rank));
158 23 : cache_entry.second[0].included_in_table = true;
159 : }
160 : }
161 :
162 12 : TrimRoutingCache();
163 : }
164 48 : }
|