LCOV - code coverage report
Current view: top level - /opt/artdaq/srcs/artdaq-mfextensions/mfextensions/Receivers - UDP_receiver.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 170 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 27 0

            Line data    Source code
       1              : #define TRACE_NAME "UDP_Receiver"
       2              : 
       3              : #include "mfextensions/Receivers/UDP_receiver.hh"
       4              : #include <sys/poll.h>
       5              : #include <sstream>
       6              : #include "messagefacility/Utilities/ELseverityLevel.h"
       7              : #include "mfextensions/Receivers/ReceiverMacros.hh"
       8              : #include "mfextensions/Receivers/detail/TCPConnect.hh"
       9              : 
      10            0 : mfviewer::UDPReceiver::UDPReceiver(fhicl::ParameterSet const& pset)
      11              :     : MVReceiver(pset)
      12            0 :     , message_port_(pset.get<int>("port", 5140))
      13            0 :     , message_addr_(pset.get<std::string>("message_address", "227.128.12.27"))
      14            0 :     , multicast_enable_(pset.get<bool>("multicast_enable", false))
      15            0 :     , multicast_out_addr_(pset.get<std::string>("multicast_interface_ip", "0.0.0.0"))
      16            0 :     , message_socket_(-1)
      17              : {
      18            0 :         TLOG(TLVL_DEBUG + 33) << "UDPReceiver Constructor";
      19            0 :         this->setObjectName("viewer UDP");
      20            0 : }
      21              : 
      22            0 : void mfviewer::UDPReceiver::setupMessageListener_()
      23              : {
      24            0 :         TLOG(TLVL_INFO) << "Setting up message listen socket, address=" << message_addr_ << ":" << message_port_;
      25            0 :         message_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
      26            0 :         if (message_socket_ < 0)
      27              :         {
      28            0 :                 TLOG(TLVL_ERROR) << "Error creating socket for receiving messages! err=" << strerror(errno);
      29            0 :                 exit(1);
      30              :         }
      31              : 
      32              :         struct sockaddr_in si_me_request;
      33              : 
      34            0 :         int yes = 1;
      35            0 :         if (setsockopt(message_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
      36              :         {
      37            0 :                 TLOG(TLVL_ERROR) << "Unable to enable port reuse on message socket, err=" << strerror(errno);
      38            0 :                 exit(1);
      39              :         }
      40            0 :         memset(&si_me_request, 0, sizeof(si_me_request));
      41            0 :         si_me_request.sin_family = AF_INET;
      42            0 :         si_me_request.sin_port = htons(message_port_);
      43            0 :         si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
      44            0 :         if (bind(message_socket_, reinterpret_cast<struct sockaddr*>(&si_me_request), sizeof(si_me_request)) == -1)  // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
      45              :         {
      46            0 :                 TLOG(TLVL_ERROR) << "Cannot bind message socket to port " << message_port_ << ", err=" << strerror(errno);
      47            0 :                 exit(1);
      48              :         }
      49              : 
      50            0 :         if (message_addr_ != "localhost" && multicast_enable_)
      51              :         {
      52              :                 struct ip_mreq mreq;
      53            0 :                 int sts = ResolveHost(message_addr_.c_str(), mreq.imr_multiaddr);
      54            0 :                 if (sts == -1)
      55              :                 {
      56            0 :                         TLOG(TLVL_ERROR) << "Unable to resolve multicast message address, err=" << strerror(errno);
      57            0 :                         exit(1);
      58              :                 }
      59            0 :                 sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), mreq.imr_interface);
      60            0 :                 if (sts == -1)
      61              :                 {
      62            0 :                         TLOG(TLVL_ERROR) << "Unable to resolve hostname for " << multicast_out_addr_;
      63            0 :                         exit(1);
      64              :                 }
      65            0 :                 if (setsockopt(message_socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
      66              :                 {
      67            0 :                         TLOG(TLVL_ERROR) << "Unable to join multicast group, err=" << strerror(errno);
      68            0 :                         exit(1);
      69              :                 }
      70              :         }
      71            0 :         TLOG(TLVL_INFO) << "Done setting up message receive socket";
      72            0 : }
      73              : 
      74            0 : mfviewer::UDPReceiver::~UDPReceiver()
      75              : {
      76            0 :         TLOG(TLVL_DEBUG + 32) << "Closing message receive socket";
      77            0 :         close(message_socket_);
      78            0 :         message_socket_ = -1;
      79            0 : }
      80              : 
      81            0 : void mfviewer::UDPReceiver::run()
      82              : {
      83            0 :         while (!stopRequested_)
      84              :         {
      85            0 :                 if (message_socket_ == -1) setupMessageListener_();
      86              : 
      87            0 :                 int ms_to_wait = 10;
      88              :                 struct pollfd ufds[1];
      89            0 :                 ufds[0].fd = message_socket_;
      90            0 :                 ufds[0].events = POLLIN | POLLPRI | POLLERR;
      91            0 :                 int rv = poll(ufds, 1, ms_to_wait);
      92              : 
      93              :                 // Continue loop if no message received or message does not have correct event ID
      94            0 :                 if (rv <= 0 || (ufds[0].revents != POLLIN && ufds[0].revents != POLLPRI))
      95              :                 {
      96            0 :                         if (rv == 1 && (ufds[0].revents & (POLLNVAL | POLLERR | POLLHUP)))
      97              :                         {
      98            0 :                                 close(message_socket_);
      99            0 :                                 message_socket_ = -1;
     100              :                         }
     101            0 :                         if (stopRequested_)
     102              :                         {
     103            0 :                                 break;
     104              :                         }
     105            0 :                         continue;
     106              :                 }
     107              : 
     108              :                 char buffer[TRACE_STREAMER_MSGMAX + 1];
     109            0 :                 auto packetSize = read(message_socket_, &buffer, TRACE_STREAMER_MSGMAX);
     110            0 :                 if (packetSize < 0)
     111              :                 {
     112            0 :                         TLOG(TLVL_ERROR) << "Error receiving message, errno=" << errno << " (" << strerror(errno) << ")";
     113              :                 }
     114              :                 else
     115              :                 {
     116            0 :                         TLOG(TLVL_DEBUG + 33) << "Recieved message; validating...(packetSize=" << packetSize << ")";
     117            0 :                         std::string message(buffer, buffer + packetSize);  // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     118            0 :                         if (validate_packet(message))
     119              :                         {
     120            0 :                                 TLOG(TLVL_DEBUG + 33) << "Valid UDP Message received! Sending to GUI!";
     121            0 :                                 emit NewMessage(read_msg(message));
     122              :                         }
     123            0 :                 }
     124              :         }
     125            0 :         TLOG(TLVL_INFO) << "UDPReceiver shutting down!";
     126            0 : }
     127              : 
     128            0 : std::list<std::string> mfviewer::UDPReceiver::tokenize_(std::string const& input)
     129              : {
     130            0 :         size_t pos = 0;
     131            0 :         std::list<std::string> output;
     132              : 
     133            0 :         while (pos != std::string::npos && pos < input.size())
     134              :         {
     135            0 :                 auto newpos = input.find('|', pos);
     136            0 :                 if (newpos != std::string::npos)
     137              :                 {
     138            0 :                         output.emplace_back(input, pos, newpos - pos);
     139              :                         // TLOG(TLVL_DEBUG + 33) << "tokenize_: " << output.back();
     140            0 :                         pos = newpos + 1;
     141              :                 }
     142              :                 else
     143              :                 {
     144            0 :                         output.emplace_back(input, pos);
     145              :                         // TLOG(TLVL_DEBUG + 33) << "tokenize_: " << output.back();
     146            0 :                         pos = newpos;
     147              :                 }
     148              :         }
     149            0 :         return output;
     150            0 : }
     151              : 
     152            0 : msg_ptr_t mfviewer::UDPReceiver::read_msg(std::string const& input)
     153              : {
     154            0 :         std::string hostname, category, application, message, hostaddr, file, line, module, eventID;
     155            0 :         mf::ELseverityLevel sev;
     156            0 :         timeval tv = {0, 0};
     157            0 :         int pid = 0;
     158            0 :         int seqNum = 0;
     159              : 
     160            0 :         TLOG(TLVL_DEBUG + 33) << "Recieved MF/Syslog message with contents: " << input;
     161              : 
     162            0 :         auto tokens = tokenize_(input);
     163            0 :         auto it = tokens.begin();
     164              : 
     165            0 :         if (it != tokens.end())
     166              :         {
     167            0 :                 bool timestamp_found = false;
     168              :                 struct tm tm;
     169              :                 time_t t;
     170            0 :                 while (it != tokens.end() && !timestamp_found)
     171              :                 {
     172            0 :                         std::string thisString = *it;
     173            0 :                         while (!thisString.empty() && !timestamp_found)
     174              :                         {
     175            0 :                                 auto pos = thisString.find_first_of("0123456789");
     176            0 :                                 if (pos != std::string::npos)
     177              :                                 {
     178            0 :                                         thisString = thisString.erase(0, pos);
     179              :                                         // TLOG(TLVL_DEBUG + 33) << "thisString: " << thisString;
     180              : 
     181            0 :                                         if (strptime(thisString.c_str(), "%d-%b-%Y %H:%M:%S", &tm) != nullptr)
     182              :                                         {
     183            0 :                                                 timestamp_found = true;
     184            0 :                                                 break;
     185              :                                         }
     186              : 
     187            0 :                                         if (!thisString.empty())
     188            0 :                                                 thisString = thisString.erase(0, 1);
     189              :                                 }
     190              :                         }
     191            0 :                         ++it;
     192            0 :                 }
     193              : 
     194            0 :                 tm.tm_isdst = -1;
     195            0 :                 t = mktime(&tm);
     196            0 :                 tv.tv_sec = t;
     197            0 :                 tv.tv_usec = 0;
     198              : 
     199            0 :                 auto prevIt = it;
     200              :                 try
     201              :                 {
     202            0 :                         if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     203              :                         {
     204            0 :                                 seqNum = std::stoi(*it);
     205              :                         }
     206              :                 }
     207            0 :                 catch (const std::invalid_argument& e)
     208              :                 {
     209            0 :                         it = prevIt;
     210            0 :                 }
     211            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     212              :                 {
     213            0 :                         hostname = *it;
     214              :                 }
     215            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     216              :                 {
     217            0 :                         hostaddr = *it;
     218              :                 }
     219            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     220              :                 {
     221            0 :                         sev = mf::ELseverityLevel(*it);
     222              :                 }
     223            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     224              :                 {
     225            0 :                         category = *it;
     226              :                 }
     227            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     228              :                 {
     229            0 :                         application = *it;
     230              :                 }
     231            0 :                 prevIt = it;
     232              :                 try
     233              :                 {
     234            0 :                         if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     235              :                         {
     236            0 :                                 pid = std::stol(*it);
     237              :                         }
     238              :                 }
     239            0 :                 catch (const std::invalid_argument& e)
     240              :                 {
     241            0 :                         it = prevIt;
     242            0 :                 }
     243            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     244              :                 {
     245            0 :                         eventID = *it;
     246              :                 }
     247            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     248              :                 {
     249            0 :                         module = *it;
     250              :                 }
     251            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     252              :                 {
     253            0 :                         file = *it;
     254              :                 }
     255            0 :                 if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     256              :                 {
     257            0 :                         line = *it;
     258              :                 }
     259            0 :                 std::ostringstream oss;
     260            0 :                 bool first = true;
     261            0 :                 while (it != tokens.end() && ++it != tokens.end() /* Advances it */)
     262              :                 {
     263            0 :                         if (!first)
     264              :                         {
     265            0 :                                 oss << "|";
     266              :                         }
     267              :                         else
     268              :                         {
     269            0 :                                 first = false;
     270              :                         }
     271            0 :                         oss << *it;
     272              :                 }
     273            0 :                 TLOG(TLVL_DEBUG + 33) << "Message content: " << oss.str();
     274            0 :                 message = oss.str();
     275            0 :         }
     276              : 
     277            0 :         auto msg = std::make_shared<qt_mf_msg>(hostname, category, application, pid, tv);
     278            0 :         msg->setSeverity(sev);
     279            0 :         msg->setMessage("UDPMessage", seqNum, message);
     280            0 :         msg->setHostAddr(hostaddr);
     281            0 :         msg->setFileName(file);
     282            0 :         msg->setLineNumber(line);
     283            0 :         msg->setModule(module);
     284            0 :         msg->setEventID(eventID);
     285            0 :         msg->updateText();
     286              : 
     287            0 :         return msg;
     288            0 : }
     289              : 
     290            0 : bool mfviewer::UDPReceiver::validate_packet(std::string const& input)
     291              : {
     292              :         // Run some checks on the input packet
     293            0 :         if (input.find("MF") == std::string::npos)
     294              :         {
     295            0 :                 TLOG(TLVL_WARNING) << "Failed to find \"MF\" in message: " << input;
     296            0 :                 return false;
     297              :         }
     298            0 :         if (input.find("|") == std::string::npos)
     299              :         {
     300            0 :                 TLOG(TLVL_WARNING) << "Failed to find | separator character in message: " << input;
     301            0 :                 return false;
     302              :         }
     303            0 :         return true;
     304              : }
     305              : 
     306              : #include "moc_UDP_receiver.cpp"
     307              : 
     308            0 : DEFINE_MFVIEWER_RECEIVER(mfviewer::UDPReceiver)
        

Generated by: LCOV version 2.0-1