Line data Source code
1 : #ifndef ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
2 : #define ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include <condition_variable>
8 : #include <map>
9 : #include <memory>
10 : #include <set>
11 :
12 : #include "artdaq/DAQrate/SharedMemoryEventManager.hh"
13 : #include "artdaq/DAQrate/detail/FragCounter.hh"
14 : #include "artdaq/TransferPlugins/TransferInterface.hh"
15 :
16 : namespace fhicl {
17 : class ParameterSet;
18 : }
19 :
20 : namespace artdaq {
21 : class DataReceiverManager;
22 : }
23 :
24 : /**
25 : * \brief Receives Fragment objects from one or more DataSenderManager instances using TransferInterface plugins
26 : * DataReceiverMaanger runs a reception thread for each source, and can automatically suppress reception from
27 : * sources which are going faster than the others.
28 : */
29 : class artdaq::DataReceiverManager
30 : {
31 : public:
32 : /**
33 : * \brief DataReceiverManager Constructor
34 : * \param ps ParameterSet used to configure the DataReceiverManager
35 : * \param shm Pointer to SharedMemoryEventManager instance (destination for received data)
36 : *
37 : * \verbatim
38 : * DataReceiverManager accepts the following Parameters:
39 : * "auto_suppression_enabled" (Default: true): Whether to suppress a source that gets too far ahead
40 : * "max_receive_difference" (Default: 50): Threshold (in sequence ID) for suppressing a source
41 : * "receive_timeout_usec" (Default: 100000): The timeout for receive operations
42 : * "enabled_sources" (OPTIONAL): List of sources which are enabled. If not specified, all sources are assumed enabled
43 : * "sources" (Default: blank table): FHiCL table containing TransferInterface configurations for each source.
44 : * NOTE: "source_rank" MUST be specified (and unique) for each source!
45 : * \endverbatim
46 : */
47 : explicit DataReceiverManager(const fhicl::ParameterSet& ps, std::shared_ptr<SharedMemoryEventManager> shm);
48 :
49 : /**
50 : * \brief DataReceiverManager Destructor
51 : */
52 : virtual ~DataReceiverManager();
53 :
54 : /**
55 : * \brief Return the count of Fragment objects received by this DataReceiverManager
56 : * \return The count of Fragment objects received by this DataReceiverManager
57 : */
58 : size_t count() const;
59 :
60 : /**
61 : * \brief Get the count of Fragment objects received by this DataReceiverManager from a given source
62 : * \param rank Source rank to get count for
63 : * \return The count of Fragment objects received by this DataReceiverManager from the source
64 : */
65 : size_t slotCount(size_t rank) const;
66 :
67 : /**
68 : * \brief Get the total size of all data recieved by this DataReceiverManager
69 : * \return The total size of all data received by this DataReceiverManager
70 : */
71 : size_t byteCount() const;
72 :
73 : /**
74 : * \brief Start receiver threads for all enabled sources
75 : */
76 : void start_threads();
77 :
78 : /**
79 : * \brief Stop receiver threads
80 : */
81 : void stop_threads();
82 :
83 : /**
84 : * \brief Get the list of enabled sources
85 : * \return The list of enabled sources
86 : */
87 : std::set<int> enabled_sources() const;
88 :
89 : /**
90 : * \brief Get the list of sources which are still receiving data
91 : * \return std::set containing ranks of sources which are still receiving data
92 : */
93 : std::set<int> running_sources() const;
94 :
95 : /**
96 : * \brief Get a handle to the SharedMemoryEventManager connected to this DataReceiverManager
97 : * \return shared_ptr to SharedMemoryEventManager instance
98 : */
99 3 : std::shared_ptr<SharedMemoryEventManager> getSharedMemoryEventManager() const { return shm_manager_; }
100 :
101 : private:
102 : DataReceiverManager(DataReceiverManager const&) = delete;
103 : DataReceiverManager(DataReceiverManager&&) = delete;
104 : DataReceiverManager& operator=(DataReceiverManager const&) = delete;
105 : DataReceiverManager& operator=(DataReceiverManager&&) = delete;
106 :
107 : void runReceiver_(int);
108 :
109 : std::atomic<bool> stop_requested_;
110 : std::atomic<size_t> stop_requested_time_;
111 :
112 : std::map<int, boost::thread> source_threads_;
113 : std::map<int, std::unique_ptr<TransferInterface>> source_plugins_;
114 :
115 : std::unordered_map<int, std::atomic<bool>> enabled_sources_;
116 : std::unordered_map<int, std::atomic<bool>> running_sources_;
117 :
118 : detail::FragCounter recv_frag_count_; // Number of frags received per source.
119 : detail::FragCounter recv_frag_size_; // Number of bytes received per source.
120 : detail::FragCounter recv_seq_count_; // For counting sequence IDs
121 :
122 : size_t receive_timeout_;
123 : size_t stop_timeout_ms_;
124 : std::shared_ptr<SharedMemoryEventManager> shm_manager_;
125 :
126 : bool non_reliable_mode_enabled_;
127 : size_t non_reliable_mode_retry_count_;
128 : };
129 :
130 : inline size_t
131 6 : artdaq::DataReceiverManager::
132 : count() const
133 : {
134 6 : return recv_frag_count_.count();
135 : }
136 :
137 : inline size_t
138 6 : artdaq::DataReceiverManager::
139 : slotCount(size_t rank) const
140 : {
141 6 : return recv_frag_count_.slotCount(rank);
142 : }
143 :
144 : inline size_t
145 3 : artdaq::DataReceiverManager::
146 : byteCount() const
147 : {
148 3 : return recv_frag_size_.count();
149 : }
150 : #endif // ARTDAQ_DAQRATE_DATATRANSFERMANAGER_HH
|