Line data Source code
1 : #define TRACE_NAME "RequestSender"
2 :
3 : #include <boost/program_options.hpp>
4 : #include <memory>
5 : #include <thread>
6 :
7 : #include "artdaq-core/Utilities/configureMessageFacility.hh"
8 : #include "artdaq/Application/LoadParameterSet.hh"
9 : #include "artdaq/DAQrate/detail/RequestReceiver.hh"
10 : #include "artdaq/DAQrate/detail/RequestSender.hh"
11 :
12 : #include "fhiclcpp/ParameterSet.h"
13 : #include "fhiclcpp/types/Atom.h"
14 : #include "fhiclcpp/types/Comment.h"
15 : #include "fhiclcpp/types/Name.h"
16 : #include "fhiclcpp/types/TableFragment.h"
17 :
18 0 : int main(int argc, char* argv[])
19 : try
20 : {
21 0 : artdaq::configureMessageFacility("RequestSender");
22 :
23 : struct Config
24 : {
25 : fhicl::TableFragment<artdaq::RequestSender::Config> senderConfig;
26 : fhicl::Atom<bool> use_receiver{fhicl::Name{"use_receiver"}, fhicl::Comment{"Whether to setup a RequestReceiver to verify that requests are being sent"}, false};
27 : fhicl::Atom<size_t> receiver_timeout_ms{fhicl::Name{"recevier_timeout_ms"}, fhicl::Comment{"Amount of time to wait for the receiver to receive a request message"}, 1000};
28 : fhicl::Table<artdaq::RequestReceiver::Config> receiver_config{fhicl::Name{"receiver_config"}, fhicl::Comment{"Configuration for RequestReceiver, if used"}};
29 : fhicl::Atom<int> num_requests{fhicl::Name{"num_requests"}, fhicl::Comment{"Number of requests to send, 0 sends until interrupted"}, 1};
30 : fhicl::Atom<double> request_rate{fhicl::Name{"request_rate"}, fhicl::Comment{"Rate at which to send requests, in Hz"}, 1.0};
31 : fhicl::Atom<artdaq::Fragment::sequence_id_t> starting_sequence_id{fhicl::Name{"starting_sequence_id"}, fhicl::Comment{"Sequence ID of first request"}, 1};
32 : fhicl::Atom<artdaq::Fragment::sequence_id_t> sequence_id_scale{fhicl::Name{"sequence_id_scale"}, fhicl::Comment{"Amount to increment Sequence ID for each request"}, 1};
33 : fhicl::Atom<artdaq::Fragment::timestamp_t> starting_timestamp{fhicl::Name{"starting_timestamp"}, fhicl::Comment{"Timestamp of first request"}, 1};
34 : fhicl::Atom<artdaq::Fragment::timestamp_t> timestamp_scale{fhicl::Name{"timestamp_scale"}, fhicl::Comment{"Amount to increment timestamp for each request"}, 1};
35 : };
36 :
37 0 : auto pset = LoadParameterSet<Config>(argc, argv, "sender", "This test application sends Data Request messages and optionally receives them to detect issues in the network transport");
38 :
39 0 : fhicl::ParameterSet tempPset;
40 0 : if (pset.has_key("daq"))
41 : {
42 0 : fhicl::ParameterSet daqPset = pset.get<fhicl::ParameterSet>("daq");
43 0 : for (auto& name : daqPset.get_pset_names())
44 : {
45 0 : auto thisPset = daqPset.get<fhicl::ParameterSet>(name);
46 0 : if (thisPset.has_key("send_requests"))
47 : {
48 0 : tempPset = thisPset;
49 : }
50 0 : }
51 0 : }
52 : else
53 : {
54 0 : tempPset = pset;
55 : }
56 :
57 0 : int rc = 0;
58 :
59 0 : artdaq::RequestSender sender(tempPset);
60 :
61 0 : std::unique_ptr<artdaq::RequestReceiver> receiver(nullptr);
62 0 : std::shared_ptr<artdaq::RequestBuffer> request_buffer(nullptr);
63 0 : int num_requests = tempPset.get<int>("num_requests", 1);
64 0 : if (num_requests == 0) num_requests = std::numeric_limits<int>::max();
65 0 : if (tempPset.get<bool>("use_receiver", false))
66 : {
67 0 : auto receiver_pset = tempPset.get<fhicl::ParameterSet>("request_receiver", fhicl::ParameterSet());
68 0 : request_buffer = std::make_shared<artdaq::RequestBuffer>(receiver_pset.get<artdaq::Fragment::sequence_id_t>("request_increment", 1));
69 0 : receiver = std::make_unique<artdaq::RequestReceiver>(receiver_pset, request_buffer);
70 0 : receiver->startRequestReception();
71 0 : }
72 :
73 0 : auto seq = tempPset.get<artdaq::Fragment::sequence_id_t>("starting_sequence_id", 1);
74 0 : auto seq_scale = tempPset.get<artdaq::Fragment::sequence_id_t>("sequence_id_scale", 1);
75 0 : auto ts = tempPset.get<artdaq::Fragment::timestamp_t>("starting_timestamp", 1);
76 0 : auto ts_scale = tempPset.get<artdaq::Fragment::timestamp_t>("timestamp_scale", 1);
77 0 : auto tmo = tempPset.get<size_t>("recevier_timeout_ms", 1000);
78 0 : auto rate = tempPset.get<double>("request_rate", 1.0);
79 0 : if (rate <= 0) rate = std::numeric_limits<double>::max();
80 :
81 0 : auto sending_start = std::chrono::steady_clock::now();
82 :
83 0 : for (auto ii = 0; ii < num_requests; ++ii)
84 : {
85 0 : TLOG(TLVL_INFO) << "Sending request " << ii << " of " << num_requests << " with sequence id " << seq;
86 0 : sender.AddRequest(seq, ts);
87 0 : sender.SendRequest();
88 :
89 0 : if (request_buffer)
90 : {
91 0 : auto start_time = std::chrono::steady_clock::now();
92 0 : bool recvd = false;
93 0 : TLOG(TLVL_INFO) << "Starting receive loop for request " << ii;
94 0 : while (!recvd && artdaq::TimeUtils::GetElapsedTimeMilliseconds(start_time) < tmo)
95 : {
96 0 : auto reqs = request_buffer->GetRequests();
97 0 : if (reqs.count(seq) != 0u)
98 : {
99 0 : TLOG(TLVL_INFO) << "Received Request for Sequence ID " << seq << ", timestamp " << reqs[seq];
100 0 : request_buffer->RemoveRequest(seq);
101 0 : sender.RemoveRequest(seq);
102 0 : recvd = true;
103 : }
104 : else
105 : {
106 0 : usleep(10000);
107 : }
108 0 : }
109 0 : if (artdaq::TimeUtils::GetElapsedTimeMilliseconds(start_time) >= tmo)
110 : {
111 0 : TLOG(TLVL_ERROR) << "Timeout elapsed in requestSender";
112 0 : return -2;
113 : }
114 : }
115 :
116 0 : seq += seq_scale;
117 0 : ts += ts_scale;
118 0 : auto target = sending_start + std::chrono::microseconds(static_cast<int>((ii + 1) * 1000000 / rate));
119 0 : auto now = std::chrono::steady_clock::now();
120 0 : if (now < target)
121 : {
122 0 : std::this_thread::sleep_until(target);
123 : }
124 : }
125 :
126 0 : if (request_buffer)
127 : {
128 0 : auto recvd = receiver->GetReceivedMessageCount();
129 0 : auto sent = sender.GetSentMessageCount();
130 0 : if (recvd != sent)
131 : {
132 0 : TLOG(TLVL_ERROR) << "Receiver reports reception of " << recvd << " messages, when sender reports sending " << sent << "!";
133 0 : rc = -1;
134 : }
135 : }
136 :
137 0 : return rc;
138 0 : }
139 0 : catch (...)
140 : {
141 0 : return -1;
142 0 : }
|