Line data Source code
1 : #define TRACE_NAME "UDP_Receiver"
2 :
3 : #include "mfextensions/Receivers/UDP_receiver.hh"
4 : #include <sys/poll.h>
5 : #include <sstream>
6 : #include "messagefacility/Utilities/ELseverityLevel.h"
7 : #include "mfextensions/Receivers/ReceiverMacros.hh"
8 : #include "mfextensions/Receivers/detail/TCPConnect.hh"
9 :
10 0 : mfviewer::UDPReceiver::UDPReceiver(fhicl::ParameterSet const& pset)
11 : : MVReceiver(pset)
12 0 : , message_port_(pset.get<int>("port", 5140))
13 0 : , message_addr_(pset.get<std::string>("message_address", "227.128.12.27"))
14 0 : , multicast_enable_(pset.get<bool>("multicast_enable", false))
15 0 : , multicast_out_addr_(pset.get<std::string>("multicast_interface_ip", "0.0.0.0"))
16 0 : , message_socket_(-1)
17 : {
18 0 : TLOG(TLVL_DEBUG + 33) << "UDPReceiver Constructor";
19 0 : this->setObjectName("viewer UDP");
20 0 : }
21 :
22 0 : void mfviewer::UDPReceiver::setupMessageListener_()
23 : {
24 0 : TLOG(TLVL_INFO) << "Setting up message listen socket, address=" << message_addr_ << ":" << message_port_;
25 0 : message_socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
26 0 : if (message_socket_ < 0)
27 : {
28 0 : TLOG(TLVL_ERROR) << "Error creating socket for receiving messages! err=" << strerror(errno);
29 0 : exit(1);
30 : }
31 :
32 : struct sockaddr_in si_me_request;
33 :
34 0 : int yes = 1;
35 0 : if (setsockopt(message_socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
36 : {
37 0 : TLOG(TLVL_ERROR) << "Unable to enable port reuse on message socket, err=" << strerror(errno);
38 0 : exit(1);
39 : }
40 0 : memset(&si_me_request, 0, sizeof(si_me_request));
41 0 : si_me_request.sin_family = AF_INET;
42 0 : si_me_request.sin_port = htons(message_port_);
43 0 : si_me_request.sin_addr.s_addr = htonl(INADDR_ANY);
44 0 : if (bind(message_socket_, reinterpret_cast<struct sockaddr*>(&si_me_request), sizeof(si_me_request)) == -1) // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
45 : {
46 0 : TLOG(TLVL_ERROR) << "Cannot bind message socket to port " << message_port_ << ", err=" << strerror(errno);
47 0 : exit(1);
48 : }
49 :
50 0 : if (message_addr_ != "localhost" && multicast_enable_)
51 : {
52 : struct ip_mreq mreq;
53 0 : int sts = ResolveHost(message_addr_.c_str(), mreq.imr_multiaddr);
54 0 : if (sts == -1)
55 : {
56 0 : TLOG(TLVL_ERROR) << "Unable to resolve multicast message address, err=" << strerror(errno);
57 0 : exit(1);
58 : }
59 0 : sts = GetInterfaceForNetwork(multicast_out_addr_.c_str(), mreq.imr_interface);
60 0 : if (sts == -1)
61 : {
62 0 : TLOG(TLVL_ERROR) << "Unable to resolve hostname for " << multicast_out_addr_;
63 0 : exit(1);
64 : }
65 0 : if (setsockopt(message_socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
66 : {
67 0 : TLOG(TLVL_ERROR) << "Unable to join multicast group, err=" << strerror(errno);
68 0 : exit(1);
69 : }
70 : }
71 0 : TLOG(TLVL_INFO) << "Done setting up message receive socket";
72 0 : }
73 :
74 0 : mfviewer::UDPReceiver::~UDPReceiver()
75 : {
76 0 : TLOG(TLVL_DEBUG + 32) << "Closing message receive socket";
77 0 : close(message_socket_);
78 0 : message_socket_ = -1;
79 0 : }
80 :
81 0 : void mfviewer::UDPReceiver::run()
82 : {
83 0 : while (!stopRequested_)
84 : {
85 0 : if (message_socket_ == -1) setupMessageListener_();
86 :
87 0 : int ms_to_wait = 10;
88 : struct pollfd ufds[1];
89 0 : ufds[0].fd = message_socket_;
90 0 : ufds[0].events = POLLIN | POLLPRI | POLLERR;
91 0 : int rv = poll(ufds, 1, ms_to_wait);
92 :
93 : // Continue loop if no message received or message does not have correct event ID
94 0 : if (rv <= 0 || (ufds[0].revents != POLLIN && ufds[0].revents != POLLPRI))
95 : {
96 0 : if (rv == 1 && (ufds[0].revents & (POLLNVAL | POLLERR | POLLHUP)))
97 : {
98 0 : close(message_socket_);
99 0 : message_socket_ = -1;
100 : }
101 0 : if (stopRequested_)
102 : {
103 0 : break;
104 : }
105 0 : continue;
106 : }
107 :
108 : char buffer[TRACE_STREAMER_MSGMAX + 1];
109 0 : auto packetSize = read(message_socket_, &buffer, TRACE_STREAMER_MSGMAX);
110 0 : if (packetSize < 0)
111 : {
112 0 : TLOG(TLVL_ERROR) << "Error receiving message, errno=" << errno << " (" << strerror(errno) << ")";
113 : }
114 : else
115 : {
116 0 : TLOG(TLVL_DEBUG + 33) << "Recieved message; validating...(packetSize=" << packetSize << ")";
117 0 : std::string message(buffer, buffer + packetSize); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
118 0 : if (validate_packet(message))
119 : {
120 0 : TLOG(TLVL_DEBUG + 33) << "Valid UDP Message received! Sending to GUI!";
121 0 : emit NewMessage(read_msg(message));
122 : }
123 0 : }
124 : }
125 0 : TLOG(TLVL_INFO) << "UDPReceiver shutting down!";
126 0 : }
127 :
128 0 : std::list<std::string> mfviewer::UDPReceiver::tokenize_(std::string const& input)
129 : {
130 0 : size_t pos = 0;
131 0 : std::list<std::string> output;
132 :
133 0 : while (pos != std::string::npos && pos < input.size())
134 : {
135 0 : auto newpos = input.find('|', pos);
136 0 : if (newpos != std::string::npos)
137 : {
138 0 : output.emplace_back(input, pos, newpos - pos);
139 : // TLOG(TLVL_DEBUG + 33) << "tokenize_: " << output.back();
140 0 : pos = newpos + 1;
141 : }
142 : else
143 : {
144 0 : output.emplace_back(input, pos);
145 : // TLOG(TLVL_DEBUG + 33) << "tokenize_: " << output.back();
146 0 : pos = newpos;
147 : }
148 : }
149 0 : return output;
150 0 : }
151 :
152 0 : msg_ptr_t mfviewer::UDPReceiver::read_msg(std::string const& input)
153 : {
154 0 : std::string hostname, category, application, message, hostaddr, file, line, module, eventID;
155 0 : mf::ELseverityLevel sev;
156 0 : timeval tv = {0, 0};
157 0 : int pid = 0;
158 0 : int seqNum = 0;
159 :
160 0 : TLOG(TLVL_DEBUG + 33) << "Recieved MF/Syslog message with contents: " << input;
161 :
162 0 : auto tokens = tokenize_(input);
163 0 : auto it = tokens.begin();
164 :
165 0 : if (it != tokens.end())
166 : {
167 0 : bool timestamp_found = false;
168 : struct tm tm;
169 : time_t t;
170 0 : while (it != tokens.end() && !timestamp_found)
171 : {
172 0 : std::string thisString = *it;
173 0 : while (!thisString.empty() && !timestamp_found)
174 : {
175 0 : auto pos = thisString.find_first_of("0123456789");
176 0 : if (pos != std::string::npos)
177 : {
178 0 : thisString = thisString.erase(0, pos);
179 : // TLOG(TLVL_DEBUG + 33) << "thisString: " << thisString;
180 :
181 0 : if (strptime(thisString.c_str(), "%d-%b-%Y %H:%M:%S", &tm) != nullptr)
182 : {
183 0 : timestamp_found = true;
184 0 : break;
185 : }
186 :
187 0 : if (!thisString.empty())
188 0 : thisString = thisString.erase(0, 1);
189 : }
190 : }
191 0 : ++it;
192 0 : }
193 :
194 0 : tm.tm_isdst = -1;
195 0 : t = mktime(&tm);
196 0 : tv.tv_sec = t;
197 0 : tv.tv_usec = 0;
198 :
199 0 : auto prevIt = it;
200 : try
201 : {
202 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
203 : {
204 0 : seqNum = std::stoi(*it);
205 : }
206 : }
207 0 : catch (const std::invalid_argument& e)
208 : {
209 0 : it = prevIt;
210 0 : }
211 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
212 : {
213 0 : hostname = *it;
214 : }
215 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
216 : {
217 0 : hostaddr = *it;
218 : }
219 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
220 : {
221 0 : sev = mf::ELseverityLevel(*it);
222 : }
223 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
224 : {
225 0 : category = *it;
226 : }
227 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
228 : {
229 0 : application = *it;
230 : }
231 0 : prevIt = it;
232 : try
233 : {
234 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
235 : {
236 0 : pid = std::stol(*it);
237 : }
238 : }
239 0 : catch (const std::invalid_argument& e)
240 : {
241 0 : it = prevIt;
242 0 : }
243 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
244 : {
245 0 : eventID = *it;
246 : }
247 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
248 : {
249 0 : module = *it;
250 : }
251 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
252 : {
253 0 : file = *it;
254 : }
255 0 : if (it != tokens.end() && ++it != tokens.end() /* Advances it */)
256 : {
257 0 : line = *it;
258 : }
259 0 : std::ostringstream oss;
260 0 : bool first = true;
261 0 : while (it != tokens.end() && ++it != tokens.end() /* Advances it */)
262 : {
263 0 : if (!first)
264 : {
265 0 : oss << "|";
266 : }
267 : else
268 : {
269 0 : first = false;
270 : }
271 0 : oss << *it;
272 : }
273 0 : TLOG(TLVL_DEBUG + 33) << "Message content: " << oss.str();
274 0 : message = oss.str();
275 0 : }
276 :
277 0 : auto msg = std::make_shared<qt_mf_msg>(hostname, category, application, pid, tv);
278 0 : msg->setSeverity(sev);
279 0 : msg->setMessage("UDPMessage", seqNum, message);
280 0 : msg->setHostAddr(hostaddr);
281 0 : msg->setFileName(file);
282 0 : msg->setLineNumber(line);
283 0 : msg->setModule(module);
284 0 : msg->setEventID(eventID);
285 0 : msg->updateText();
286 :
287 0 : return msg;
288 0 : }
289 :
290 0 : bool mfviewer::UDPReceiver::validate_packet(std::string const& input)
291 : {
292 : // Run some checks on the input packet
293 0 : if (input.find("MF") == std::string::npos)
294 : {
295 0 : TLOG(TLVL_WARNING) << "Failed to find \"MF\" in message: " << input;
296 0 : return false;
297 : }
298 0 : if (input.find("|") == std::string::npos)
299 : {
300 0 : TLOG(TLVL_WARNING) << "Failed to find | separator character in message: " << input;
301 0 : return false;
302 : }
303 0 : return true;
304 : }
305 :
306 : #include "moc_UDP_receiver.cpp"
307 :
308 0 : DEFINE_MFVIEWER_RECEIVER(mfviewer::UDPReceiver)
|