LCOV - code coverage report
Current view: top level - /opt/artdaq/srcs/artdaq-mfextensions/mfextensions/Destinations - UDP_mfPlugin.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 137 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 17 0

            Line data    Source code
       1              : #include "cetlib/PluginTypeDeducer.h"
       2              : #include "fhiclcpp/ParameterSet.h"
       3              : #include "fhiclcpp/types/ConfigurationTable.h"
       4              : 
       5              : #include "cetlib/compiler_macros.h"
       6              : #include "messagefacility/MessageLogger/MessageLogger.h"
       7              : #include "messagefacility/MessageService/ELdestination.h"
       8              : #include "messagefacility/Utilities/ELseverityLevel.h"
       9              : #include "messagefacility/Utilities/exception.h"
      10              : 
      11              : // C/C++ includes
      12              : #include <arpa/inet.h>
      13              : #include <ifaddrs.h>
      14              : #include <netdb.h>
      15              : #include <netinet/in.h>
      16              : #include <algorithm>
      17              : #include <fstream>
      18              : #include <iostream>
      19              : #include <memory>
      20              : #include <mutex>
      21              : #include "mfextensions/Receivers/detail/TCPConnect.hh"
      22              : 
      23              : #define TRACE_NAME "UDP_mfPlugin"
      24              : #include "trace.h"
      25              : 
      26              : // Boost includes
      27              : #include <boost/algorithm/string.hpp>
      28              : 
      29              : namespace mfplugins {
      30              : using mf::ErrorObj;
      31              : using mf::service::ELdestination;
      32              : 
      33              : /// <summary>
      34              : /// Message Facility UDP Streamer Destination
      35              : /// Formats messages into a delimited string and sends via UDP
      36              : /// </summary>
      37              : class ELUDP : public ELdestination
      38              : {
      39              : public:
      40              :         /**
      41              :          * \brief Configuration Parameters for ELUDP
      42              :          */
      43              :         struct Config
      44              :         {
      45              :                 /// ELDestination common config parameters
      46              :                 fhicl::TableFragment<ELdestination::Config> elDestConfig;
      47              :                 /// "error_turnoff_threshold" (Default: 0): Number of errors before turning off destination (default: 0, don't turn
      48              :                 /// off)"
      49              :                 fhicl::Atom<int> error_max = fhicl::Atom<int>{
      50              :                     fhicl::Name{"error_turnoff_threshold"},
      51              :                     fhicl::Comment{"Number of errors before turning off destination (default: 0, don't turn off)"}, 0};
      52              :                 /// "error_report_backoff_factor" (Default: 100): Print an error message every N errors
      53              :                 fhicl::Atom<int> error_report = fhicl::Atom<int>{fhicl::Name{"error_report_backoff_factor"},
      54              :                                                                  fhicl::Comment{"Print an error message every N errors"}, 100};
      55              :                 /// "host" (Default: "227.128.12.27"): Address to send messages to
      56              :                 fhicl::Atom<std::string> host =
      57              :                     fhicl::Atom<std::string>{fhicl::Name{"host"}, fhicl::Comment{"Address to send messages to"}, "227.128.12.27"};
      58              :                 /// "port" (Default: 5140): Port to send messages to
      59              :                 fhicl::Atom<int> port = fhicl::Atom<int>{fhicl::Name{"port"}, fhicl::Comment{"Port to send messages to"}, 5140};
      60              :                 /// "multicast_enabled" (Default: false): Whether messages should be sent via multicast
      61              :                 fhicl::Atom<bool> multicast_enabled = fhicl::Atom<bool>{
      62              :                     fhicl::Name{"multicast_enabled"}, fhicl::Comment{"Whether messages should be sent via multicast"}, false};
      63              :                 /// "multicast_interface_ip" (Default: "0.0.0.0"): Use this hostname for multicast output (to assign to the proper
      64              :                 /// NIC)
      65              :                 fhicl::Atom<std::string> output_address = fhicl::Atom<std::string>{
      66              :                     fhicl::Name{"multicast_interface_ip"},
      67              :                     fhicl::Comment{"Use this hostname for multicast output(to assign to the proper NIC)"}, "0.0.0.0"};
      68              :                 /// filename_delimit (Default: "/"): Grab path after this. "/srcs/" /x/srcs/y/z.cc => y/z.cc
      69              :                 fhicl::Atom<std::string> filename_delimit =
      70              :                     fhicl::Atom<std::string>{fhicl::Name{"filename_delimit"},
      71              :                                              fhicl::Comment{"Grab path after this. \"/srcs/\" /x/srcs/y/z.cc => y/z.cc. NOTE: only works if full filename is given to this plugin (based on which mf::<method> is used)."}, "/"};
      72              :         };
      73              :         /// Used for ParameterSet validation
      74              :         using Parameters = fhicl::WrappedTable<Config>;
      75              : 
      76              : public:
      77              :         /// <summary>
      78              :         /// ELUDP Constructor
      79              :         /// </summary>
      80              :         /// <param name="pset">ParameterSet used to configure ELUDP</param>
      81              :         ELUDP(Parameters const& pset);
      82              : 
      83              :         /**
      84              :          * \brief Fill the "Prefix" portion of the message
      85              :          * \param o Output stringstream
      86              :          * \param msg MessageFacility object containing header information
      87              :          */
      88              :         void fillPrefix(std::ostringstream& o, const ErrorObj& msg) override;
      89              : 
      90              :         /**
      91              :          * \brief Fill the "User Message" portion of the message
      92              :          * \param o Output stringstream
      93              :          * \param msg MessageFacility object containing header information
      94              :          */
      95              :         void fillUsrMsg(std::ostringstream& o, const ErrorObj& msg) override;
      96              : 
      97              :         /**
      98              :          * \brief Fill the "Suffix" portion of the message (Unused)
      99              :          */
     100            0 :         void fillSuffix(std::ostringstream& /*unused*/, const ErrorObj& /*msg*/) override {}
     101              : 
     102              :         /**
     103              :          * \brief Serialize a MessageFacility message to the output
     104              :          * \param o Stringstream object containing message data
     105              :          * \param e MessageFacility object containing header information
     106              :          */
     107              :         void routePayload(const std::ostringstream& o, const ErrorObj& e) override;
     108              : 
     109              : private:
     110              :         void reconnect_();
     111              : 
     112              :         // Parameters
     113              :         int error_report_backoff_factor_;
     114              :         int error_max_;
     115              :         std::string host_;
     116              :         int port_;
     117              :         bool multicast_enabled_;
     118              :         std::string multicast_out_addr_;
     119              : 
     120              :         int message_socket_;
     121              :         struct sockaddr_in message_addr_;
     122              : 
     123              :         // Other stuff
     124              :         int consecutive_success_count_;
     125              :         int error_count_;
     126              :         int next_error_report_;
     127              :         int seqNum_;
     128              : 
     129              :         int64_t pid_;
     130              :         std::string hostname_;
     131              :         std::string hostaddr_;
     132              :         std::string app_;
     133              :         std::string filename_delimit_;
     134              : };
     135              : 
     136              : // END DECLARATION
     137              : //======================================================================
     138              : // BEGIN IMPLEMENTATION
     139              : 
     140              : //======================================================================
     141              : // ELUDP c'tor
     142              : //======================================================================
     143              : 
     144            0 : ELUDP::ELUDP(Parameters const& pset)
     145            0 :     : ELdestination(pset().elDestConfig()), error_report_backoff_factor_(pset().error_report()), error_max_(pset().error_max()), host_(pset().host()), port_(pset().port()), multicast_enabled_(pset().multicast_enabled()), multicast_out_addr_(pset().output_address()), message_socket_(-1), consecutive_success_count_(0), error_count_(0), next_error_report_(1), seqNum_(0), pid_(static_cast<int64_t>(getpid())), filename_delimit_(pset().filename_delimit())
     146              : {
     147              :         // hostname
     148              :         char hostname_c[1024];
     149            0 :         hostname_ = (gethostname(hostname_c, 1023) == 0) ? hostname_c : "Unkonwn Host";
     150              : 
     151              :         // host ip address
     152            0 :         hostent* host = nullptr;
     153            0 :         host = gethostbyname(hostname_c);
     154              : 
     155            0 :         if (host != nullptr)
     156              :         {
     157              :                 // ip address from hostname if the entry exists in /etc/hosts
     158            0 :                 char* ip = inet_ntoa(*reinterpret_cast<struct in_addr*>(host->h_addr));  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast,cppcoreguidelines-pro-bounds-pointer-arithmetic)
     159            0 :                 hostaddr_ = ip;
     160              :         }
     161              :         else
     162              :         {
     163              :                 // enumerate all network interfaces
     164            0 :                 struct ifaddrs* ifAddrStruct = nullptr;
     165            0 :                 struct ifaddrs* ifa = nullptr;
     166            0 :                 void* tmpAddrPtr = nullptr;
     167              : 
     168            0 :                 if (getifaddrs(&ifAddrStruct) != 0)
     169              :                 {
     170              :                         // failed to get addr struct
     171            0 :                         hostaddr_ = "127.0.0.1";
     172              :                 }
     173              :                 else
     174              :                 {
     175              :                         // iterate through all interfaces
     176            0 :                         for (ifa = ifAddrStruct; ifa != nullptr; ifa = ifa->ifa_next)
     177              :                         {
     178            0 :                                 if (ifa->ifa_addr->sa_family == AF_INET)
     179              :                                 {
     180              :                                         // a valid IPv4 addres
     181            0 :                                         tmpAddrPtr = &(reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr)->sin_addr);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     182              :                                         char addressBuffer[INET_ADDRSTRLEN];
     183            0 :                                         inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
     184            0 :                                         hostaddr_ = addressBuffer;
     185              :                                 }
     186              : 
     187            0 :                                 else if (ifa->ifa_addr->sa_family == AF_INET6)
     188              :                                 {
     189              :                                         // a valid IPv6 address
     190            0 :                                         tmpAddrPtr = &(reinterpret_cast<struct sockaddr_in6*>(ifa->ifa_addr)->sin6_addr);  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     191              :                                         char addressBuffer[INET6_ADDRSTRLEN];
     192            0 :                                         inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN);
     193            0 :                                         hostaddr_ = addressBuffer;
     194              :                                 }
     195              : 
     196              :                                 // find first non-local address
     197            0 :                                 if (!hostaddr_.empty() && (hostaddr_ != "127.0.0.1") && (hostaddr_ != "::1"))
     198              :                                 {
     199            0 :                                         break;
     200              :                                 }
     201              :                         }
     202              : 
     203            0 :                         if (hostaddr_.empty())
     204              :                         {  // failed to find anything
     205            0 :                                 hostaddr_ = "127.0.0.1";
     206              :                         }
     207              :                 }
     208              :         }
     209              : 
     210              : #if 0
     211              :                 // get process name from '/proc/pid/exe'
     212              :                 std::string exe;
     213              :                 std::ostringstream pid_ostr;
     214              :                 pid_ostr << "/proc/" << pid_ << "/exe";
     215              :                 exe = realpath(pid_ostr.str().c_str(), NULL);
     216              : 
     217              :                 size_t end = exe.find('\0');
     218              :                 size_t start = exe.find_last_of('/', end);
     219              : 
     220              :                 app_ = exe.substr(start + 1, end - start - 1);
     221              : #else
     222              :         // get process name from '/proc/pid/cmdline'
     223            0 :         std::stringstream ss;
     224            0 :         ss << "//proc//" << pid_ << "//cmdline";
     225            0 :         std::ifstream procfile{ss.str().c_str()};
     226              : 
     227            0 :         std::string procinfo;
     228              : 
     229            0 :         if (procfile.is_open())
     230              :         {
     231            0 :                 procfile >> procinfo;
     232            0 :                 procfile.close();
     233              :         }
     234              : 
     235            0 :         size_t end = procinfo.find('\0');
     236            0 :         size_t start = procinfo.find_last_of('/', end);
     237              : 
     238            0 :         app_ = procinfo.substr(start + 1, end - start - 1);
     239              : #endif
     240            0 : }
     241              : 
     242            0 : void ELUDP::reconnect_()
     243              : {
     244              :         static std::mutex mutex;
     245            0 :         std::lock_guard<std::mutex> lk(mutex);
     246            0 :         if (message_socket_ == -1)
     247              :         {
     248            0 :                 message_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
     249            0 :                 if (message_socket_ < 0)
     250              :                 {
     251            0 :                         TLOG(TLVL_ERROR) << "I failed to create the socket for sending Data messages! err=" << strerror(errno);
     252            0 :                         exit(1);
     253              :                 }
     254            0 :                 int sts = ResolveHost(host_.c_str(), port_, message_addr_);
     255            0 :                 if (sts == -1)
     256              :                 {
     257            0 :                         TLOG(TLVL_ERROR) << "Unable to resolve Data message address, err=" << strerror(errno);
     258            0 :                         exit(1);
     259              :                 }
     260              : 
     261            0 :                 if (multicast_out_addr_ == "0.0.0.0")
     262              :                 {
     263            0 :                         multicast_out_addr_.reserve(HOST_NAME_MAX);
     264            0 :                         sts = gethostname(&multicast_out_addr_[0], HOST_NAME_MAX);
     265            0 :                         if (sts < 0)
     266              :                         {
     267            0 :                                 TLOG(TLVL_ERROR) << "Could not get current hostname,  err=" << strerror(errno);
     268            0 :                                 exit(1);
     269              :                         }
     270              :                 }
     271              : 
     272            0 :                 if (multicast_out_addr_ != "localhost")
     273              :                 {
     274              :                         struct in_addr addr;
     275            0 :                         sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), addr);
     276              :                         // sts = ResolveHost(multicast_out_addr_.c_str(), addr);
     277            0 :                         if (sts == -1)
     278              :                         {
     279            0 :                                 TLOG(TLVL_ERROR) << "Unable to resolve multicast interface address, err=" << strerror(errno);
     280            0 :                                 exit(1);
     281              :                         }
     282              : 
     283            0 :                         if (setsockopt(message_socket_, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == -1)
     284              :                         {
     285            0 :                                 TLOG(TLVL_ERROR) << "Cannot set outgoing interface, err=" << strerror(errno);
     286            0 :                                 exit(1);
     287              :                         }
     288              :                 }
     289            0 :                 int yes = 1;
     290            0 :                 if (setsockopt(message_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
     291              :                 {
     292            0 :                         TLOG(TLVL_ERROR) << "Unable to enable port reuse on message socket, err=" << strerror(errno);
     293            0 :                         exit(1);
     294              :                 }
     295            0 :                 if (setsockopt(message_socket_, IPPROTO_IP, IP_MULTICAST_LOOP, &yes, sizeof(yes)) < 0)
     296              :                 {
     297            0 :                         TLOG(TLVL_ERROR) << "Unable to enable multicast loopback on message socket, err=" << strerror(errno);
     298            0 :                         exit(1);
     299              :                 }
     300            0 :                 if (setsockopt(message_socket_, SOL_SOCKET, SO_BROADCAST, &yes, sizeof(yes)) == -1)
     301              :                 {
     302            0 :                         TLOG(TLVL_ERROR) << "Cannot set message socket to broadcast, err=" << strerror(errno);
     303            0 :                         exit(1);
     304              :                 }
     305              :         }
     306            0 : }
     307              : 
     308              : //======================================================================
     309              : // Message prefix filler ( overriddes ELdestination::fillPrefix )
     310              : //======================================================================
     311            0 : void ELUDP::fillPrefix(std::ostringstream& oss, const ErrorObj& msg)
     312              : {
     313            0 :         const auto& xid = msg.xid();
     314              : 
     315            0 :         auto id = xid.id();
     316            0 :         auto module = xid.module();
     317            0 :         auto app = app_;
     318            0 :         std::replace(id.begin(), id.end(), '|', '!');
     319            0 :         std::replace(app.begin(), app.end(), '|', '!');
     320            0 :         std::replace(module.begin(), module.end(), '|', '!');
     321              : 
     322            0 :         oss << format_.timestamp(msg.timestamp()) << "|";  // timestamp
     323            0 :         oss << std::to_string(++seqNum_) << "|";           // sequence number
     324            0 :         oss << hostname_ << "|";                           // host name
     325            0 :         oss << hostaddr_ << "|";                           // host address
     326            0 :         oss << xid.severity().getName() << "|";            // severity
     327            0 :         oss << id << "|";                                  // category
     328            0 :         oss << app << "|";                                 // application
     329            0 :         oss << pid_ << "|";
     330            0 :         oss << mf::GetIteration() << "|";  // run/event no
     331              : 
     332            0 :         oss << module << "|";  // module name
     333            0 :         if (filename_delimit_.empty())
     334              :         {
     335            0 :                 oss << msg.filename();
     336              :         }
     337            0 :         else if (filename_delimit_.size() == 1)  // for a single character (i.e '/'), search in reverse.
     338              :         {
     339            0 :                 oss << (strrchr(&msg.filename()[0], filename_delimit_[0]) != nullptr  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     340            0 :                             ? strrchr(&msg.filename()[0], filename_delimit_[0]) + 1   // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     341            0 :                             : msg.filename());
     342              :         }
     343              :         else
     344              :         {
     345            0 :                 const char* cp = strstr(&msg.filename()[0], &filename_delimit_[0]);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     346            0 :                 if (cp != nullptr)
     347              :                 {
     348              :                         // make sure to remove a part that ends with '/'
     349            0 :                         cp += filename_delimit_.size() - 1;  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     350            0 :                         while (*cp && *cp != '/') ++cp;
     351            0 :                         ++cp;  // increment past '/'
     352            0 :                         oss << cp;
     353              :                 }
     354              :                 else
     355            0 :                         oss << msg.filename();
     356              :         }
     357            0 :         oss << "|" << std::to_string(msg.lineNumber()) << "|";
     358            0 : }
     359              : 
     360              : //======================================================================
     361              : // Message filler ( overriddes ELdestination::fillUsrMsg )
     362              : //======================================================================
     363            0 : void ELUDP::fillUsrMsg(std::ostringstream& oss, const ErrorObj& msg)
     364              : {
     365            0 :         std::ostringstream tmposs;
     366              :         // Print the contents.
     367            0 :         for (auto const& val : msg.items())
     368              :         {
     369            0 :                 tmposs << val;
     370              :         }
     371              : 
     372              :         // remove leading "\n" if present
     373            0 :         const std::string& usrMsg = tmposs.str().compare(0, 1, "\n") == 0 ? tmposs.str().erase(0, 1) : tmposs.str();
     374              : 
     375            0 :         oss << usrMsg;
     376            0 : }
     377              : 
     378              : //======================================================================
     379              : // Message router ( overriddes ELdestination::routePayload )
     380              : //======================================================================
     381            0 : void ELUDP::routePayload(const std::ostringstream& oss, const ErrorObj& /*msg*/)
     382              : {
     383            0 :         if (message_socket_ == -1)
     384              :         {
     385            0 :                 reconnect_();
     386              :         }
     387            0 :         if (error_count_ < error_max_ || error_max_ == 0)
     388              :         {
     389              :                 char str[INET_ADDRSTRLEN];
     390            0 :                 inet_ntop(AF_INET, &(message_addr_.sin_addr), str, INET_ADDRSTRLEN);
     391              : 
     392            0 :                 auto string = "UDPMFMESSAGE" + std::to_string(pid_) + "|" + oss.str();
     393            0 :                 auto sts = sendto(message_socket_, string.c_str(), string.size(), 0, reinterpret_cast<struct sockaddr*>(&message_addr_),  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
     394              :                                   sizeof(message_addr_));
     395              : 
     396            0 :                 if (sts < 0)
     397              :                 {
     398            0 :                         consecutive_success_count_ = 0;
     399            0 :                         ++error_count_;
     400            0 :                         if (error_count_ == next_error_report_)
     401              :                         {
     402            0 :                                 TLOG(TLVL_ERROR) << "Error sending message " << seqNum_ << " to " << host_ << ", errno=" << errno << " ("
     403            0 :                                                  << strerror(errno) << ")";
     404            0 :                                 next_error_report_ *= error_report_backoff_factor_;
     405              :                         }
     406              :                 }
     407              :                 else
     408              :                 {
     409            0 :                         ++consecutive_success_count_;
     410            0 :                         if (consecutive_success_count_ >= 5)
     411              :                         {
     412            0 :                                 error_count_ = 0;
     413            0 :                                 next_error_report_ = 1;
     414              :                         }
     415              :                 }
     416            0 :         }
     417            0 : }
     418              : }  // end namespace mfplugins
     419              : 
     420              : //======================================================================
     421              : //
     422              : // makePlugin function
     423              : //
     424              : //======================================================================
     425              : 
     426              : #ifndef EXTERN_C_FUNC_DECLARE_START
     427              : #define EXTERN_C_FUNC_DECLARE_START extern "C" {
     428              : #endif
     429              : 
     430              : EXTERN_C_FUNC_DECLARE_START
     431            0 : auto makePlugin(const std::string& /*unused*/, const fhicl::ParameterSet& pset)
     432              : {
     433            0 :         return std::make_unique<mfplugins::ELUDP>(pset);
     434              : }
     435              : }
     436              : 
     437            0 : DEFINE_BASIC_PLUGINTYPE_FUNC(mf::service::ELdestination)
        

Generated by: LCOV version 2.0-1