Line data Source code
1 : #include "cetlib/PluginTypeDeducer.h"
2 : #include "fhiclcpp/ParameterSet.h"
3 : #include "fhiclcpp/types/ConfigurationTable.h"
4 :
5 : #include "cetlib/compiler_macros.h"
6 : #include "messagefacility/MessageLogger/MessageLogger.h"
7 : #include "messagefacility/MessageService/ELdestination.h"
8 : #include "messagefacility/Utilities/ELseverityLevel.h"
9 : #include "messagefacility/Utilities/exception.h"
10 :
11 : // C/C++ includes
12 : #include <librdkafka/rdkafkacpp.h>
13 : #include <chrono>
14 : #include <fstream>
15 :
16 : #define TRACE_NAME "Kafka_mfPlugin"
17 : #include "trace.h"
18 :
19 : namespace mfplugins {
20 : using mf::ErrorObj;
21 : using mf::service::ELdestination;
22 :
23 : /// <summary>
24 : /// Message Facility Kafka Streamer Destination
25 : /// Sends messages via Kafka
26 : /// </summary>
27 : class ELKafka : public ELdestination
28 : {
29 : public:
30 : /**
31 : * \brief Configuration Parameters for ELKafka
32 : */
33 : struct Config
34 : {
35 : /// ELDestination common config parameters
36 : fhicl::TableFragment<ELdestination::Config> elDestConfig;
37 : /// "kafka_key" (Default: "artdaq"): Key for grouping log messages in Kafka
38 : fhicl::Atom<std::string> kafka_key = fhicl::Atom<std::string>{fhicl::Name{"kafka_key"}, fhicl::Comment{"Key for grouping log messages in Kafka"}, "artdaq"};
39 : /// "kafka_server" (Default: "localhost:30092"): Address of the Kafka service
40 : fhicl::Atom<std::string> kafka_server = fhicl::Atom<std::string>{fhicl::Name{"kafka_server"}, fhicl::Comment{"Address of the Kafka service"}, "localhost:30092"};
41 : /// "kafka_client" (Default: Application name from /proc/<pid>/cmdline): Name of this Kafka client
42 : fhicl::Atom<std::string> kafka_client = fhicl::Atom<std::string>{fhicl::Name{"kafka_client"}, fhicl::Comment{"Name of this Kafka client"}, ""};
43 : };
44 : /// Used for ParameterSet validation
45 : using Parameters = fhicl::WrappedTable<Config>;
46 :
47 : public:
48 : /// <summary>
49 : /// ELKafka Constructor
50 : /// </summary>
51 : /// <param name="pset">ParameterSet used to configure ELKafka</param>
52 : ELKafka(Parameters const& pset);
53 :
54 : /**
55 : * \brief Serialize a MessageFacility message to the output
56 : * \param o Stringstream object containing message data
57 : * \param e MessageFacility object containing header information
58 : */
59 : void routePayload(const std::ostringstream& o, const ErrorObj& e) override;
60 :
61 : private:
62 : // Parameters
63 : std::unique_ptr<RdKafka::Producer> kafka_producer_;
64 : std::string kafka_key_;
65 : };
66 :
67 : // END DECLARATION
68 : //======================================================================
69 : // BEGIN IMPLEMENTATION
70 :
71 : //======================================================================
72 : // ELKafka c'tor
73 : //======================================================================
74 :
75 0 : ELKafka::ELKafka(Parameters const& pset)
76 0 : : ELdestination(pset().elDestConfig())
77 0 : , kafka_key_(pset().kafka_key())
78 : {
79 : // get process name from '/proc/pid/cmdline'
80 0 : std::stringstream ss;
81 0 : ss << "//proc//" << getpid() << "//cmdline";
82 0 : std::ifstream procfile{ss.str().c_str()};
83 :
84 0 : std::string procinfo;
85 :
86 0 : if (procfile.is_open())
87 : {
88 0 : procfile >> procinfo;
89 0 : procfile.close();
90 : }
91 :
92 0 : size_t end = procinfo.find('\0');
93 0 : size_t start = procinfo.find_last_of('/', end);
94 :
95 0 : std::string app = procinfo.substr(start + 1, end - start - 1);
96 :
97 0 : RdKafka::Conf* k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
98 0 : std::string errstr;
99 :
100 0 : k_conf->set("bootstrap.servers", pset().kafka_server(), errstr);
101 0 : if (errstr != "")
102 : {
103 0 : throw std::runtime_error(errstr);
104 : }
105 :
106 0 : std::string client_id = pset().kafka_client();
107 0 : if (client_id == "")
108 : {
109 0 : client_id = app;
110 : }
111 :
112 0 : k_conf->set("client.id", client_id, errstr);
113 0 : if (errstr != "")
114 : {
115 0 : throw std::runtime_error(errstr);
116 : }
117 :
118 : // Create producer instance
119 0 : kafka_producer_.reset(RdKafka::Producer::create(k_conf, errstr));
120 :
121 0 : if (errstr != "")
122 : {
123 0 : throw std::runtime_error(errstr);
124 : }
125 0 : }
126 :
127 : //======================================================================
128 : // Message router ( overriddes ELdestination::routePayload )
129 : //======================================================================
130 0 : void ELKafka::routePayload(const std::ostringstream& oss, const ErrorObj& msg)
131 : {
132 0 : const auto& xid = msg.xid();
133 : // get the topic
134 0 : auto topic = xid.id();
135 :
136 0 : auto key = kafka_key_;
137 :
138 0 : std::string binary = oss.str();
139 0 : uint64_t timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
140 :
141 : // RdKafka::Producer::RK_MSG_COPY to be investigated
142 0 : RdKafka::ErrorCode err = kafka_producer_->produce(topic,
143 : RdKafka::Topic::PARTITION_UA,
144 : RdKafka::Producer::RK_MSG_COPY,
145 0 : const_cast<char*>(binary.c_str()), binary.size(),
146 0 : key.c_str(),
147 : key.size(),
148 : timestamp_ms,
149 : nullptr);
150 0 : if (err != RdKafka::ERR_NO_ERROR)
151 : {
152 0 : TLOG(TLVL_ERROR) << "Error sending message to Kafka: " << RdKafka::err2str(err);
153 : }
154 0 : }
155 : } // end namespace mfplugins
156 :
157 : //======================================================================
158 : //
159 : // makePlugin function
160 : //
161 : //======================================================================
162 :
163 : #ifndef EXTERN_C_FUNC_DECLARE_START
164 : #define EXTERN_C_FUNC_DECLARE_START extern "C" {
165 : #endif
166 :
167 : EXTERN_C_FUNC_DECLARE_START
168 0 : auto makePlugin(const std::string& /*unused*/, const fhicl::ParameterSet& pset)
169 : {
170 0 : return std::make_unique<mfplugins::ELKafka>(pset);
171 : }
172 : }
173 :
174 0 : DEFINE_BASIC_PLUGINTYPE_FUNC(mf::service::ELdestination)
|