Line data Source code
1 : #include "cetlib/PluginTypeDeducer.h"
2 : #include "fhiclcpp/ParameterSet.h"
3 : #include "fhiclcpp/types/ConfigurationTable.h"
4 :
5 : #include "cetlib/compiler_macros.h"
6 : #include "messagefacility/MessageLogger/MessageLogger.h"
7 : #include "messagefacility/MessageService/ELdestination.h"
8 : #include "messagefacility/Utilities/ELseverityLevel.h"
9 : #include "messagefacility/Utilities/exception.h"
10 :
11 : // C/C++ includes
12 : #include <arpa/inet.h>
13 : #include <ifaddrs.h>
14 : #include <netdb.h>
15 : #include <netinet/in.h>
16 : #include <algorithm>
17 : #include <fstream>
18 : #include <iostream>
19 : #include <memory>
20 : #include <mutex>
21 : #include "mfextensions/Receivers/detail/TCPConnect.hh"
22 :
23 : #define TRACE_NAME "UDP_mfPlugin"
24 : #include "trace.h"
25 :
26 : // Boost includes
27 : #include <boost/algorithm/string.hpp>
28 :
29 : namespace mfplugins {
30 : using mf::ErrorObj;
31 : using mf::service::ELdestination;
32 :
33 : /// <summary>
34 : /// Message Facility UDP Streamer Destination
35 : /// Formats messages into a delimited string and sends via UDP
36 : /// </summary>
37 : class ELUDP : public ELdestination
38 : {
39 : public:
40 : /**
41 : * \brief Configuration Parameters for ELUDP
42 : */
43 : struct Config
44 : {
45 : /// ELDestination common config parameters
46 : fhicl::TableFragment<ELdestination::Config> elDestConfig;
47 : /// "error_turnoff_threshold" (Default: 0): Number of errors before turning off destination (default: 0, don't turn
48 : /// off)"
49 : fhicl::Atom<int> error_max = fhicl::Atom<int>{
50 : fhicl::Name{"error_turnoff_threshold"},
51 : fhicl::Comment{"Number of errors before turning off destination (default: 0, don't turn off)"}, 0};
52 : /// "error_report_backoff_factor" (Default: 100): Print an error message every N errors
53 : fhicl::Atom<int> error_report = fhicl::Atom<int>{fhicl::Name{"error_report_backoff_factor"},
54 : fhicl::Comment{"Print an error message every N errors"}, 100};
55 : /// "host" (Default: "227.128.12.27"): Address to send messages to
56 : fhicl::Atom<std::string> host =
57 : fhicl::Atom<std::string>{fhicl::Name{"host"}, fhicl::Comment{"Address to send messages to"}, "227.128.12.27"};
58 : /// "port" (Default: 5140): Port to send messages to
59 : fhicl::Atom<int> port = fhicl::Atom<int>{fhicl::Name{"port"}, fhicl::Comment{"Port to send messages to"}, 5140};
60 : /// "multicast_enabled" (Default: false): Whether messages should be sent via multicast
61 : fhicl::Atom<bool> multicast_enabled = fhicl::Atom<bool>{
62 : fhicl::Name{"multicast_enabled"}, fhicl::Comment{"Whether messages should be sent via multicast"}, false};
63 : /// "multicast_interface_ip" (Default: "0.0.0.0"): Use this hostname for multicast output (to assign to the proper
64 : /// NIC)
65 : fhicl::Atom<std::string> output_address = fhicl::Atom<std::string>{
66 : fhicl::Name{"multicast_interface_ip"},
67 : fhicl::Comment{"Use this hostname for multicast output(to assign to the proper NIC)"}, "0.0.0.0"};
68 : /// filename_delimit (Default: "/"): Grab path after this. "/srcs/" /x/srcs/y/z.cc => y/z.cc
69 : fhicl::Atom<std::string> filename_delimit =
70 : fhicl::Atom<std::string>{fhicl::Name{"filename_delimit"},
71 : fhicl::Comment{"Grab path after this. \"/srcs/\" /x/srcs/y/z.cc => y/z.cc. NOTE: only works if full filename is given to this plugin (based on which mf::<method> is used)."}, "/"};
72 : };
73 : /// Used for ParameterSet validation
74 : using Parameters = fhicl::WrappedTable<Config>;
75 :
76 : public:
77 : /// <summary>
78 : /// ELUDP Constructor
79 : /// </summary>
80 : /// <param name="pset">ParameterSet used to configure ELUDP</param>
81 : ELUDP(Parameters const& pset);
82 :
83 : /**
84 : * \brief Fill the "Prefix" portion of the message
85 : * \param o Output stringstream
86 : * \param msg MessageFacility object containing header information
87 : */
88 : void fillPrefix(std::ostringstream& o, const ErrorObj& msg) override;
89 :
90 : /**
91 : * \brief Fill the "User Message" portion of the message
92 : * \param o Output stringstream
93 : * \param msg MessageFacility object containing header information
94 : */
95 : void fillUsrMsg(std::ostringstream& o, const ErrorObj& msg) override;
96 :
97 : /**
98 : * \brief Fill the "Suffix" portion of the message (Unused)
99 : */
100 0 : void fillSuffix(std::ostringstream& /*unused*/, const ErrorObj& /*msg*/) override {}
101 :
102 : /**
103 : * \brief Serialize a MessageFacility message to the output
104 : * \param o Stringstream object containing message data
105 : * \param e MessageFacility object containing header information
106 : */
107 : void routePayload(const std::ostringstream& o, const ErrorObj& e) override;
108 :
109 : private:
110 : void reconnect_();
111 :
112 : // Parameters
113 : int error_report_backoff_factor_;
114 : int error_max_;
115 : std::string host_;
116 : int port_;
117 : bool multicast_enabled_;
118 : std::string multicast_out_addr_;
119 :
120 : int message_socket_;
121 : struct sockaddr_in message_addr_;
122 :
123 : // Other stuff
124 : int consecutive_success_count_;
125 : int error_count_;
126 : int next_error_report_;
127 : int seqNum_;
128 :
129 : int64_t pid_;
130 : std::string hostname_;
131 : std::string hostaddr_;
132 : std::string app_;
133 : std::string filename_delimit_;
134 : };
135 :
136 : // END DECLARATION
137 : //======================================================================
138 : // BEGIN IMPLEMENTATION
139 :
140 : //======================================================================
141 : // ELUDP c'tor
142 : //======================================================================
143 :
144 0 : ELUDP::ELUDP(Parameters const& pset)
145 0 : : ELdestination(pset().elDestConfig()), error_report_backoff_factor_(pset().error_report()), error_max_(pset().error_max()), host_(pset().host()), port_(pset().port()), multicast_enabled_(pset().multicast_enabled()), multicast_out_addr_(pset().output_address()), message_socket_(-1), consecutive_success_count_(0), error_count_(0), next_error_report_(1), seqNum_(0), pid_(static_cast<int64_t>(getpid())), filename_delimit_(pset().filename_delimit())
146 : {
147 : // hostname
148 : char hostname_c[1024];
149 0 : hostname_ = (gethostname(hostname_c, 1023) == 0) ? hostname_c : "Unkonwn Host";
150 :
151 : // host ip address
152 0 : hostent* host = nullptr;
153 0 : host = gethostbyname(hostname_c);
154 :
155 0 : if (host != nullptr)
156 : {
157 : // ip address from hostname if the entry exists in /etc/hosts
158 0 : char* ip = inet_ntoa(*reinterpret_cast<struct in_addr*>(host->h_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
159 0 : hostaddr_ = ip;
160 : }
161 : else
162 : {
163 : // enumerate all network interfaces
164 0 : struct ifaddrs* ifAddrStruct = nullptr;
165 0 : struct ifaddrs* ifa = nullptr;
166 0 : void* tmpAddrPtr = nullptr;
167 :
168 0 : if (getifaddrs(&ifAddrStruct) != 0)
169 : {
170 : // failed to get addr struct
171 0 : hostaddr_ = "127.0.0.1";
172 : }
173 : else
174 : {
175 : // iterate through all interfaces
176 0 : for (ifa = ifAddrStruct; ifa != nullptr; ifa = ifa->ifa_next)
177 : {
178 0 : if (ifa->ifa_addr->sa_family == AF_INET)
179 : {
180 : // a valid IPv4 addres
181 0 : tmpAddrPtr = &(reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr)->sin_addr); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
182 : char addressBuffer[INET_ADDRSTRLEN];
183 0 : inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
184 0 : hostaddr_ = addressBuffer;
185 : }
186 :
187 0 : else if (ifa->ifa_addr->sa_family == AF_INET6)
188 : {
189 : // a valid IPv6 address
190 0 : tmpAddrPtr = &(reinterpret_cast<struct sockaddr_in6*>(ifa->ifa_addr)->sin6_addr); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
191 : char addressBuffer[INET6_ADDRSTRLEN];
192 0 : inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
193 0 : hostaddr_ = addressBuffer;
194 : }
195 :
196 : // find first non-local address
197 0 : if (!hostaddr_.empty() && (hostaddr_ != "127.0.0.1") && (hostaddr_ != "::1"))
198 : {
199 0 : break;
200 : }
201 : }
202 :
203 0 : if (hostaddr_.empty())
204 : { // failed to find anything
205 0 : hostaddr_ = "127.0.0.1";
206 : }
207 : }
208 : }
209 :
210 : #if 0
211 : // get process name from '/proc/pid/exe'
212 : std::string exe;
213 : std::ostringstream pid_ostr;
214 : pid_ostr << "/proc/" << pid_ << "/exe";
215 : exe = realpath(pid_ostr.str().c_str(), NULL);
216 :
217 : size_t end = exe.find('\0');
218 : size_t start = exe.find_last_of('/', end);
219 :
220 : app_ = exe.substr(start + 1, end - start - 1);
221 : #else
222 : // get process name from '/proc/pid/cmdline'
223 0 : std::stringstream ss;
224 0 : ss << "//proc//" << pid_ << "//cmdline";
225 0 : std::ifstream procfile{ss.str().c_str()};
226 :
227 0 : std::string procinfo;
228 :
229 0 : if (procfile.is_open())
230 : {
231 0 : procfile >> procinfo;
232 0 : procfile.close();
233 : }
234 :
235 0 : size_t end = procinfo.find('\0');
236 0 : size_t start = procinfo.find_last_of('/', end);
237 :
238 0 : app_ = procinfo.substr(start + 1, end - start - 1);
239 : #endif
240 0 : }
241 :
242 0 : void ELUDP::reconnect_()
243 : {
244 : static std::mutex mutex;
245 0 : std::lock_guard<std::mutex> lk(mutex);
246 0 : if (message_socket_ == -1)
247 : {
248 0 : message_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
249 0 : if (message_socket_ < 0)
250 : {
251 0 : TLOG(TLVL_ERROR) << "I failed to create the socket for sending Data messages! err=" << strerror(errno);
252 0 : exit(1);
253 : }
254 0 : int sts = ResolveHost(host_.c_str(), port_, message_addr_);
255 0 : if (sts == -1)
256 : {
257 0 : TLOG(TLVL_ERROR) << "Unable to resolve Data message address, err=" << strerror(errno);
258 0 : exit(1);
259 : }
260 :
261 0 : if (multicast_out_addr_ == "0.0.0.0")
262 : {
263 0 : multicast_out_addr_.reserve(HOST_NAME_MAX);
264 0 : sts = gethostname(&multicast_out_addr_[0], HOST_NAME_MAX);
265 0 : if (sts < 0)
266 : {
267 0 : TLOG(TLVL_ERROR) << "Could not get current hostname, err=" << strerror(errno);
268 0 : exit(1);
269 : }
270 : }
271 :
272 0 : if (multicast_out_addr_ != "localhost")
273 : {
274 : struct in_addr addr;
275 0 : sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), addr);
276 : // sts = ResolveHost(multicast_out_addr_.c_str(), addr);
277 0 : if (sts == -1)
278 : {
279 0 : TLOG(TLVL_ERROR) << "Unable to resolve multicast interface address, err=" << strerror(errno);
280 0 : exit(1);
281 : }
282 :
283 0 : if (setsockopt(message_socket_, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == -1)
284 : {
285 0 : TLOG(TLVL_ERROR) << "Cannot set outgoing interface, err=" << strerror(errno);
286 0 : exit(1);
287 : }
288 : }
289 0 : int yes = 1;
290 0 : if (setsockopt(message_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
291 : {
292 0 : TLOG(TLVL_ERROR) << "Unable to enable port reuse on message socket, err=" << strerror(errno);
293 0 : exit(1);
294 : }
295 0 : if (setsockopt(message_socket_, IPPROTO_IP, IP_MULTICAST_LOOP, &yes, sizeof(yes)) < 0)
296 : {
297 0 : TLOG(TLVL_ERROR) << "Unable to enable multicast loopback on message socket, err=" << strerror(errno);
298 0 : exit(1);
299 : }
300 0 : if (setsockopt(message_socket_, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes)) == -1)
301 : {
302 0 : TLOG(TLVL_ERROR) << "Cannot set message socket to broadcast, err=" << strerror(errno);
303 0 : exit(1);
304 : }
305 : }
306 0 : }
307 :
308 : //======================================================================
309 : // Message prefix filler ( overriddes ELdestination::fillPrefix )
310 : //======================================================================
311 0 : void ELUDP::fillPrefix(std::ostringstream& oss, const ErrorObj& msg)
312 : {
313 0 : const auto& xid = msg.xid();
314 :
315 0 : auto id = xid.id();
316 0 : auto module = xid.module();
317 0 : auto app = app_;
318 0 : std::replace(id.begin(), id.end(), '|', '!');
319 0 : std::replace(app.begin(), app.end(), '|', '!');
320 0 : std::replace(module.begin(), module.end(), '|', '!');
321 :
322 0 : oss << format_.timestamp(msg.timestamp()) << "|"; // timestamp
323 0 : oss << std::to_string(++seqNum_) << "|"; // sequence number
324 0 : oss << hostname_ << "|"; // host name
325 0 : oss << hostaddr_ << "|"; // host address
326 0 : oss << xid.severity().getName() << "|"; // severity
327 0 : oss << id << "|"; // category
328 0 : oss << app << "|"; // application
329 0 : oss << pid_ << "|";
330 0 : oss << mf::GetIteration() << "|"; // run/event no
331 :
332 0 : oss << module << "|"; // module name
333 0 : if (filename_delimit_.empty())
334 : {
335 0 : oss << msg.filename();
336 : }
337 0 : else if (filename_delimit_.size() == 1) // for a single character (i.e '/'), search in reverse.
338 : {
339 0 : oss << (strrchr(&msg.filename()[0], filename_delimit_[0]) != nullptr // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
340 0 : ? strrchr(&msg.filename()[0], filename_delimit_[0]) + 1 // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
341 0 : : msg.filename());
342 : }
343 : else
344 : {
345 0 : const char* cp = strstr(&msg.filename()[0], &filename_delimit_[0]); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
346 0 : if (cp != nullptr)
347 : {
348 : // make sure to remove a part that ends with '/'
349 0 : cp += filename_delimit_.size() - 1; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
350 0 : while (*cp && *cp != '/') ++cp;
351 0 : ++cp; // increment past '/'
352 0 : oss << cp;
353 : }
354 : else
355 0 : oss << msg.filename();
356 : }
357 0 : oss << "|" << std::to_string(msg.lineNumber()) << "|";
358 0 : }
359 :
360 : //======================================================================
361 : // Message filler ( overriddes ELdestination::fillUsrMsg )
362 : //======================================================================
363 0 : void ELUDP::fillUsrMsg(std::ostringstream& oss, const ErrorObj& msg)
364 : {
365 0 : std::ostringstream tmposs;
366 : // Print the contents.
367 0 : for (auto const& val : msg.items())
368 : {
369 0 : tmposs << val;
370 : }
371 :
372 : // remove leading "\n" if present
373 0 : const std::string& usrMsg = tmposs.str().compare(0, 1, "\n") == 0 ? tmposs.str().erase(0, 1) : tmposs.str();
374 :
375 0 : oss << usrMsg;
376 0 : }
377 :
378 : //======================================================================
379 : // Message router ( overriddes ELdestination::routePayload )
380 : //======================================================================
381 0 : void ELUDP::routePayload(const std::ostringstream& oss, const ErrorObj& /*msg*/)
382 : {
383 0 : if (message_socket_ == -1)
384 : {
385 0 : reconnect_();
386 : }
387 0 : if (error_count_ < error_max_ || error_max_ == 0)
388 : {
389 : char str[INET_ADDRSTRLEN];
390 0 : inet_ntop(AF_INET, &(message_addr_.sin_addr), str, INET_ADDRSTRLEN);
391 :
392 0 : auto string = "UDPMFMESSAGE" + std::to_string(pid_) + "|" + oss.str();
393 0 : auto sts = sendto(message_socket_, string.c_str(), string.size(), 0, reinterpret_cast<struct sockaddr*>(&message_addr_), // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
394 : sizeof(message_addr_));
395 :
396 0 : if (sts < 0)
397 : {
398 0 : consecutive_success_count_ = 0;
399 0 : ++error_count_;
400 0 : if (error_count_ == next_error_report_)
401 : {
402 0 : TLOG(TLVL_ERROR) << "Error sending message " << seqNum_ << " to " << host_ << ", errno=" << errno << " ("
403 0 : << strerror(errno) << ")";
404 0 : next_error_report_ *= error_report_backoff_factor_;
405 : }
406 : }
407 : else
408 : {
409 0 : ++consecutive_success_count_;
410 0 : if (consecutive_success_count_ >= 5)
411 : {
412 0 : error_count_ = 0;
413 0 : next_error_report_ = 1;
414 : }
415 : }
416 0 : }
417 0 : }
418 : } // end namespace mfplugins
419 :
420 : //======================================================================
421 : //
422 : // makePlugin function
423 : //
424 : //======================================================================
425 :
426 : #ifndef EXTERN_C_FUNC_DECLARE_START
427 : #define EXTERN_C_FUNC_DECLARE_START extern "C" {
428 : #endif
429 :
430 : EXTERN_C_FUNC_DECLARE_START
431 0 : auto makePlugin(const std::string& /*unused*/, const fhicl::ParameterSet& pset)
432 : {
433 0 : return std::make_unique<mfplugins::ELUDP>(pset);
434 : }
435 : }
436 :
437 0 : DEFINE_BASIC_PLUGINTYPE_FUNC(mf::service::ELdestination)
|