Line data Source code
1 : #ifndef artdaq_test_TransferPlugins_BrokenTransferTest_hh
2 : #define artdaq_test_TransferPlugins_BrokenTransferTest_hh
3 :
4 : #include "TRACE/tracemf.h" // Pre-empt TRACE/trace.h from Fragment.hh.
5 : #include "artdaq-core/Data/Fragment.hh"
6 :
7 : #include "artdaq-core/Core/SharedMemoryManager.hh"
8 : #include "artdaq/TransferPlugins/TransferInterface.hh"
9 :
10 : #include <fhiclcpp/ParameterSet.h>
11 : #include <fhiclcpp/types/Atom.h>
12 : #include <fhiclcpp/types/Comment.h>
13 : #include <fhiclcpp/types/ConfigurationTable.h>
14 : #include <fhiclcpp/types/Name.h>
15 : #include <fhiclcpp/types/Table.h>
16 :
17 : #include <boost/thread.hpp>
18 :
19 : #include <condition_variable>
20 : #include <mutex>
21 : #include <unordered_set>
22 :
23 : namespace artdaqtest {
24 : /// <summary>
25 : /// A class which simulates several failure modes for TransferPlugins such as sender pause/restart and receiver pause/restart
26 : /// </summary>
27 : class BrokenTransferTest
28 : {
29 : public:
30 : /// <summary>
31 : /// Configuration parameters for BrokenTransferTest
32 : /// </summary>
33 : struct Config
34 : {
35 : /// "fragment_rate_hz" (Default: 10): The rate at which to generate Fragments, in Hz
36 : fhicl::Atom<size_t> fragment_rate_hz{fhicl::Name{"fragment_rate_hz"}, fhicl::Comment{"The rate at which to generate Fragments, in Hz"}, 10};
37 : /// "reliable_mode" (Default: true): Whether to use reliable-mode transfers (true) or min-blocking (false)
38 : fhicl::Atom<bool> reliable_mode{fhicl::Name{"reliable_mode"}, fhicl::Comment{"Whether to use reliable-mode transfers (true) or min-blocking (false)"}, true};
39 : /// "fragment_size" (Default: 0x10000): The size of generated Fragments, in Fragment words
40 : fhicl::Atom<size_t> fragment_size{fhicl::Name{"fragment_size"}, fhicl::Comment{"The size of generated Fragments, in Fragment words"}, 0x10000};
41 : /// "send_timeout_us" (Default: 100000): The timeout for min-blocking mode sends
42 : fhicl::Atom<size_t> send_timeout_us{fhicl::Name{"send_timeout_us"}, fhicl::Comment{"The timeout for min-blocking mode sends"}, 100000};
43 : /// "transfer_buffer_count" (Default: 10): The number of buffers in the Transfer Plugins
44 : fhicl::Atom<size_t> transfer_buffer_count{fhicl::Name{"transfer_buffer_count"}, fhicl::Comment{"The number of buffers in the Transfer Plugins"}, 10};
45 : /// "event_buffer_count" (Default: 20): The number of "EventBuilder" buffers on the receiver end
46 : fhicl::Atom<size_t> event_buffer_count{fhicl::Name{"event_buffer_count"}, fhicl::Comment{"The number of \"EventBuilder\" buffers on the receiver end"}, 20};
47 : /// "event_buffer_timeout_us" (Default: 1000000): The timeout for "EventBuilder" buffers to be marked incomplete and abandoned
48 : fhicl::Atom<size_t> event_buffer_timeout_us{fhicl::Name{"event_buffer_timeout_us"}, fhicl::Comment{"The timeout for \"EventBuilder\" buffers to be marked incomplete and abandoned"}, 1000000};
49 : /// "default_transfer_ps" (Default: {}): The default ParameterSet to use for transfers. Will have transferPluginType, destination_rank, buffer_count and source_rank overridden. If max_fragment_size_words is unspecified, will be set using fragment_size
50 : fhicl::Table<artdaq::TransferInterface::Config> default_transfer_ps{fhicl::Name{"default_transfer_ps"}, fhicl::Comment{"The default ParameterSet to use for transfers. Will have transferPluginType, destination_rank, buffer_count and source_rank overridden. If max_fragment_size_words is unspecified, will be set using fragment_size"}};
51 : /// "transfer_to_use" (Default: "TCPSocket"): The name of the Transfer Plugin to use
52 : fhicl::Atom<std::string> transfer_to_use{fhicl::Name{"transfer_to_use"}, fhicl::Comment{"The name of the Transfer Plugin to use"}, "TCPSocket"};
53 : };
54 :
55 : /// <summary>
56 : /// BrokenTransferTest Constructor
57 : /// </summary>
58 : /// <param name="ps">ParameterSet containing BrokenTransferTest configuration</param>
59 : BrokenTransferTest(const fhicl::ParameterSet& ps);
60 :
61 : /// <summary>
62 : /// Run the "Sender Paused" test
63 : /// </summary>
64 : void TestSenderPause();
65 : /// <summary>
66 : /// Run the "Receiver Paused" test
67 : /// </summary>
68 : void TestReceiverPause();
69 : /// <summary>
70 : /// Run the "Sender Reconnect" test
71 : /// </summary>
72 : void TestSenderReconnect();
73 : /// <summary>
74 : /// Run the "Receiver Reconnect" test
75 : /// </summary>
76 : /// <param name="send_throttle_factor">Amount of time Sender should wait, in units of 1/fragment_rate</param>
77 : void TestReceiverReconnect(int send_throttle_factor = 0);
78 :
79 : private:
80 : struct received_event
81 : {
82 : artdaq::FragmentPtr first_frag;
83 : artdaq::FragmentPtr second_frag;
84 : std::chrono::steady_clock::time_point open_time;
85 : };
86 :
87 : fhicl::ParameterSet make_transfer_ps_(int sender_rank, int receiver_rank, const std::string& name);
88 :
89 : void start_test_();
90 : void stop_test_();
91 :
92 : void do_sending_(int sender_rank);
93 : void do_receiving_(int sender_rank, int receiver_rank);
94 :
95 : void throttle_sender_(int sender_rank);
96 : artdaq::Fragment::sequence_id_t sequence_id_target_();
97 0 : void usleep_for_n_fragments_(size_t n)
98 : {
99 : // n Fragments * 1000000 us/s / fragment_rate_hz_ fragments/s ==> usecs for n Fragments
100 0 : usleep(n * 1000000 / fragment_rate_hz_);
101 0 : }
102 0 : void usleep_for_n_buffer_epochs_(size_t n)
103 : {
104 0 : usleep_for_n_fragments_(n * (event_buffer_count_ + transfer_buffer_count_));
105 0 : }
106 :
107 : std::string fm_(double data, const std::string& units, int logt = 0);
108 :
109 : boost::thread sender_threads_[2];
110 : boost::thread receiver_threads_[2];
111 :
112 : std::array<std::atomic<bool>, 2> sender_ready_;
113 : std::array<std::atomic<bool>, 2> receiver_ready_;
114 :
115 : std::array<std::atomic<artdaq::Fragment::sequence_id_t>, 2> sender_current_fragment_;
116 : std::array<std::atomic<int>, 2> sender_tokens_;
117 :
118 : fhicl::ParameterSet ps_;
119 :
120 : std::chrono::steady_clock::time_point test_start_time_; ///< Tests are synchronized by time/rate
121 : std::chrono::steady_clock::time_point test_end_time_;
122 : std::atomic<bool> test_end_requested_;
123 : std::atomic<size_t> fragment_rate_hz_;
124 : std::atomic<bool> pause_first_sender_;
125 : std::atomic<bool> pause_receiver_;
126 : std::atomic<bool> kill_first_sender_;
127 : std::atomic<bool> kill_receiver_;
128 :
129 : std::atomic<bool> reliable_mode_;
130 : size_t fragment_size_;
131 : size_t send_timeout_us_;
132 :
133 : std::map<artdaq::Fragment::sequence_id_t, received_event> event_buffer_;
134 : std::set<artdaq::Fragment::sequence_id_t> timeout_events_;
135 : std::set<artdaq::Fragment::sequence_id_t> complete_events_;
136 : size_t transfer_buffer_count_;
137 : size_t event_buffer_count_;
138 : size_t event_buffer_timeout_us_;
139 : int send_throttle_us_;
140 : std::mutex event_buffer_mutex_;
141 : std::condition_variable event_buffer_cv_;
142 : };
143 : } // namespace artdaqtest
144 :
145 : #endif // artdaq_test_TransferPlugins_BrokenTransferTest_hh
|