Line data Source code
1 : #include "artdaq/DAQdata/Globals.hh" // Before trace.h gets included in ConcurrentQueue (from GlobalQueue)
2 : #define TRACE_NAME (app_name + "_TokenSender").c_str()
3 : #include <dlfcn.h>
4 : #include <chrono>
5 : #include <cstring>
6 : #include <fstream>
7 : #include <iomanip>
8 : #include <sstream>
9 : #include <utility>
10 : #include "artdaq/DAQrate/detail/TokenSender.hh"
11 :
12 : #include "artdaq-core/Core/StatisticsCollection.hh"
13 : #include "artdaq/DAQdata/TCPConnect.hh"
14 : #include "artdaq/DAQrate/detail/RoutingPacket.hh"
15 : #include "cetlib_except/exception.h"
16 :
17 : namespace artdaq {
18 0 : TokenSender::TokenSender(const fhicl::ParameterSet& pset)
19 0 : : initialized_(false)
20 0 : , send_routing_tokens_(pset.get<bool>("use_routing_manager", false))
21 0 : , token_port_(pset.get<int>("routing_token_port", 35555))
22 0 : , token_socket_(-1)
23 0 : , token_address_(pset.get<std::string>("routing_manager_hostname", "localhost"))
24 0 : , tokens_sent_(0)
25 0 : , run_number_(0)
26 : {
27 0 : TLOG(TLVL_DEBUG + 32) << "TokenSender CONSTRUCTOR";
28 :
29 0 : setup_tokens_();
30 0 : TLOG(TLVL_DEBUG + 35) << "artdaq::TokenSender::TokenSender ctor - reader_thread_ initialized";
31 0 : initialized_ = true;
32 0 : }
33 :
34 0 : TokenSender::~TokenSender()
35 : {
36 0 : TLOG(TLVL_INFO) << "Shutting down TokenSender, token_socket_: " << token_socket_;
37 :
38 0 : if (token_socket_ != -1)
39 : {
40 0 : if (shutdown(token_socket_, 2) != 0 && errno == ENOTSOCK)
41 : {
42 0 : TLOG(TLVL_ERROR) << "Shutdown of token_socket_ resulted in ENOTSOCK. NOT Closing file descriptor!";
43 : }
44 : else
45 : {
46 0 : close(token_socket_);
47 : }
48 0 : token_socket_ = -1;
49 : }
50 0 : }
51 :
52 0 : void TokenSender::setup_tokens_()
53 : {
54 0 : if (send_routing_tokens_)
55 : {
56 0 : TLOG(TLVL_DEBUG + 32) << "Creating Routing Token sending socket";
57 0 : auto start_time = std::chrono::steady_clock::now();
58 0 : while (token_socket_ < 0 && TimeUtils::GetElapsedTime(start_time) < 30)
59 : {
60 0 : token_socket_ = TCPConnect(token_address_.c_str(), token_port_, 0, sizeof(detail::RoutingToken));
61 0 : if (token_socket_ < 0)
62 : {
63 0 : TLOG(TLVL_DEBUG + 33) << "Waited " << TimeUtils::GetElapsedTime(start_time) << " s for Routing Manager to open token socket";
64 0 : usleep(100000);
65 : }
66 : }
67 0 : if (token_socket_ < 0)
68 : {
69 0 : TLOG(TLVL_ERROR) << "I failed to create the socket for sending Routing Tokens! err=" << strerror(errno);
70 0 : exit(1);
71 : }
72 0 : TLOG(TLVL_INFO) << "Routing Token sending socket created successfully for address " << token_address_;
73 : }
74 0 : }
75 :
76 0 : void TokenSender::send_routing_token_(int nSlots, int run_number, int rank)
77 : {
78 0 : TLOG(TLVL_DEBUG + 33) << "send_routing_token_ called, send_routing_tokens_=" << std::boolalpha << send_routing_tokens_;
79 0 : if (!send_routing_tokens_)
80 : {
81 0 : return;
82 : }
83 0 : if (token_socket_ == -1)
84 : {
85 0 : setup_tokens_();
86 : }
87 : detail::RoutingToken token;
88 0 : token.header = TOKEN_MAGIC;
89 0 : token.rank = rank;
90 0 : token.new_slots_free = nSlots;
91 0 : token.run_number = run_number;
92 :
93 0 : TLOG(TLVL_DEBUG + 33) << "Sending RoutingToken to " << token_address_ << ":" << token_port_;
94 0 : size_t sts = 0;
95 0 : while (sts < sizeof(detail::RoutingToken))
96 : {
97 0 : auto res = send(token_socket_, reinterpret_cast<uint8_t*>(&token) + sts, sizeof(detail::RoutingToken) - sts, 0); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
98 0 : if (res < 0)
99 : {
100 0 : TLOG(TLVL_WARNING) << "Error on token_socket, reconnecting";
101 0 : close(token_socket_);
102 0 : token_socket_ = -1;
103 0 : sts = 0;
104 0 : setup_tokens_();
105 0 : continue;
106 0 : }
107 0 : sts += res;
108 : }
109 0 : tokens_sent_ += nSlots;
110 0 : TLOG(TLVL_DEBUG + 33) << "Done sending RoutingToken to " << token_address_ << ":" << token_port_;
111 : }
112 :
113 0 : void TokenSender::SendRoutingToken(int nSlots, int run_number, int rank)
114 : {
115 0 : while (!initialized_)
116 : {
117 0 : usleep(1000);
118 : }
119 0 : if (!send_routing_tokens_)
120 : {
121 0 : return;
122 : }
123 0 : boost::thread token([this, nSlots, run_number, rank] { send_routing_token_(nSlots, run_number, rank); });
124 0 : token.detach();
125 0 : usleep(0); // Give up time slice
126 0 : }
127 :
128 : } // namespace artdaq
|