Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
2 : #define ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
3 :
4 : #include "fhiclcpp/types/Sequence.h" // Must pre-empt fhiclcpp/types/Atom.h
5 :
6 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
7 : #include "artdaq-core/Data/Fragment.hh"
8 :
9 : #include "artdaq/DAQdata/HostMap.hh"
10 : #include "artdaq/DAQrate/detail/FragCounter.hh"
11 : #include "artdaq/DAQrate/detail/TableReceiver.hh"
12 : #include "artdaq/TransferPlugins/TransferInterface.hh"
13 :
14 : #include "fhiclcpp/types/Atom.h"
15 : #include "fhiclcpp/types/Comment.h"
16 : #include "fhiclcpp/types/ConfigurationTable.h"
17 : #include "fhiclcpp/types/Name.h"
18 : #include "fhiclcpp/types/OptionalTable.h"
19 : #include "fhiclcpp/types/TableFragment.h"
20 :
21 : #include <netinet/in.h>
22 : #include <atomic>
23 : #include <map>
24 : #include <memory>
25 : #include <set>
26 :
27 : namespace artdaq {
28 : class DataSenderManager;
29 : }
30 :
31 : /**
32 : * \brief Sends Fragment objects using TransferInterface plugins. Uses Routing Tables if confgiured,
33 : * otherwise will Round-Robin Fragments to the destinations.
34 : */
35 : class artdaq::DataSenderManager
36 : {
37 : public:
38 : /// <summary>
39 : /// Configuration for transfers to destinations
40 : /// </summary>
41 : struct DestinationsConfig
42 : {
43 : /// Example Configuration for transfer to destination. See artdaq::TransferInterface::Config
44 : fhicl::OptionalTable<artdaq::TransferInterface::Config> dest{fhicl::Name{"d1"}, fhicl::Comment{"Configuration for transfer to destination"}};
45 : };
46 :
47 : /// <summary>
48 : /// Configuration of DataSenderManager. May be used for parameter validation
49 : /// </summary>
50 : struct Config
51 : {
52 : /// "broadcast_sends" (Default: false): Send all Fragments to all destinations
53 : fhicl::Atom<bool> broadcast_sends{fhicl::Name{"broadcast_sends"}, fhicl::Comment{"Send all Fragments to all destinations"}, false};
54 : /// "nonblocking_sends" (Default: false): If true, will use non-reliable mode of TransferInterface plugins
55 : fhicl::Atom<bool> nonblocking_sends{fhicl::Name{"nonblocking_sends"}, fhicl::Comment{"Whether sends should block. Used for DL->DISP connection."}, false};
56 : /// "send_timeout_usec" (Default: 5000000 (5 seconds): Timeout for sends in non-reliable modes (broadcast and nonblocking)
57 : fhicl::Atom<size_t> send_timeout_us{fhicl::Name{"send_timeout_usec"}, fhicl::Comment{"Timeout for sends in non-reliable modes (broadcast and nonblocking)"}, 5000000};
58 : /// "send_retry_count" (Default: 2): Number of times to retry a send in non-reliable mode
59 : fhicl::Atom<size_t> send_retry_count{fhicl::Name{"send_retry_count"}, fhicl::Comment{"Number of times to retry a send in non-reliable mode"}, 2};
60 : fhicl::OptionalTable<artdaq::TableReceiver::Config> routing_table_config{fhicl::Name{"routing_table_config"}}; ///< Configuration for Routing Table reception. See artdaq::DataSenderManager::RoutingTableConfig
61 : /// "destinations" (Default: Empty ParameterSet): FHiCL table for TransferInterface configurations for each destaintion. See artdaq::DataSenderManager::DestinationsConfig
62 : /// NOTE: "destination_rank" MUST be specified (and unique) for each destination!
63 : fhicl::OptionalTable<DestinationsConfig> destinations{fhicl::Name{"destinations"}};
64 : /// Optional host_map configuration (Can also be specified in each DestinationsConfig entry. See artdaq::HostMap::Config
65 : fhicl::TableFragment<artdaq::HostMap::Config> host_map;
66 : /// enabled_destinations" (OPTIONAL): If specified, only the destination ranks listed will be enabled. If not specified, all destinations will be enabled.
67 : fhicl::Sequence<size_t> enabled_destinations{fhicl::Name{"enabled_destinations"}, fhicl::Comment{"List of destiantion ranks to activate (must be defined in destinations block)"}, std::vector<size_t>()};
68 : };
69 : /// Used for ParameterSet validation (if desired)
70 : using Parameters = fhicl::WrappedTable<Config>;
71 :
72 : /**
73 : * \brief DataSenderManager Constructor
74 : * \param ps ParameterSet used to configure the DataSenderManager. See artdaq::DataSenderManager::Config
75 : */
76 : explicit DataSenderManager(const fhicl::ParameterSet& ps);
77 :
78 : /**
79 : * \brief DataSenderManager Destructor
80 : */
81 : virtual ~DataSenderManager();
82 :
83 : /**
84 : * \brief Send the given Fragment. Return the rank of the destination to which the Fragment was sent.
85 : * \param frag Fragment to sent
86 : * \return Pair containing Rank of destination for Fragment and the CopyStatus from the send call
87 : */
88 : std::pair<int, TransferInterface::CopyStatus> sendFragment(Fragment&& frag);
89 :
90 : /**
91 : * \brief Return the count of Fragment objects sent by this DataSenderManagerq
92 : * \return The count of Fragment objects sent by this DataSenderManager
93 : */
94 : size_t count() const;
95 :
96 : /**
97 : * \brief Get the count of Fragment objects sent by this DataSenderManager to a given destination
98 : * \param rank Destination rank to get count for
99 : * \return The count of Fragment objects sent by this DataSenderManager to the destination
100 : */
101 : size_t slotCount(size_t rank) const;
102 :
103 : /**
104 : * \brief Get the number of configured destinations
105 : * \return The number of configured destinations
106 : */
107 : size_t destinationCount() const { return destinations_.size(); }
108 :
109 : /**
110 : * \brief Get the list of enabled destinations
111 : * \return The list of enabled destiantion ranks
112 : */
113 : std::set<int> enabled_destinations() const { return enabled_destinations_; }
114 :
115 : /**
116 : * \brief Gets the current size of the Routing Table, in case other parts of the system want to use this information
117 : * \return The current size of the Routing Table.
118 : */
119 : size_t GetRoutingTableEntryCount() const;
120 :
121 : /**
122 : * \brief Gets the number of sends remaining in the routing table, in case other parts of the system want to use this information
123 : * \return The number of sends remaining in the routing table
124 : */
125 : size_t GetRemainingRoutingTableEntries() const;
126 :
127 : /**
128 : * \brief Stop the DataSenderManager, aborting any sends in progress
129 : */
130 0 : void StopSender() { should_stop_ = true; }
131 :
132 : /**
133 : * \brief Remove the given sequence ID from the routing table and sent_count lists
134 : * \param seq Sequence ID to remove
135 : */
136 : void RemoveRoutingTableEntry(Fragment::sequence_id_t seq);
137 : /**
138 : * \brief Get the number of Fragments sent with a given Sequence ID
139 : * \param seq Sequence ID to query
140 : * \return The number of Fragments sent with a given Sequence ID
141 : */
142 : size_t GetSentSequenceIDCount(Fragment::sequence_id_t seq);
143 :
144 : private:
145 : DataSenderManager(DataSenderManager const&) = delete;
146 : DataSenderManager(DataSenderManager&&) = delete;
147 : DataSenderManager& operator=(DataSenderManager const&) = delete;
148 : DataSenderManager& operator=(DataSenderManager&&) = delete;
149 :
150 : // Calculate where the fragment with this sequenceID should go.
151 : int calcDest_(Fragment::sequence_id_t) const;
152 :
153 : private:
154 : std::map<int, std::unique_ptr<artdaq::TransferInterface>> destinations_;
155 : std::set<int> enabled_destinations_;
156 :
157 : detail::FragCounter sent_frag_count_;
158 :
159 : bool broadcast_sends_;
160 : bool non_blocking_mode_;
161 : size_t send_timeout_us_;
162 : size_t send_retry_count_;
163 :
164 : std::unique_ptr<TableReceiver> table_receiver_;
165 : std::atomic<bool> should_stop_;
166 : std::map<Fragment::sequence_id_t, size_t> sent_sequence_id_count_;
167 :
168 : mutable std::mutex sent_sequence_id_mutex_;
169 :
170 : mutable std::atomic<uint64_t> highest_sequence_id_routed_;
171 : };
172 :
173 : inline size_t
174 0 : artdaq::DataSenderManager::
175 : count() const
176 : {
177 0 : return sent_frag_count_.count();
178 : }
179 :
180 : inline size_t
181 : artdaq::DataSenderManager::
182 : slotCount(size_t rank) const
183 : {
184 : return sent_frag_count_.slotCount(rank);
185 : }
186 : #endif // ARTDAQ_DAQRATE_DATASENDERMANAGER_HH
|