LCOV - code coverage report
Current view: top level - /opt/artdaq/srcs/artdaq-mfextensions/mfextensions/Destinations - Kafka_mfPlugin.cc (source / functions) Coverage Total Hit
Test: artdaq.info.cleaned Lines: 0.0 % 43 0
Test Date: 2025-09-04 00:45:34 Functions: 0.0 % 5 0

            Line data    Source code
       1              : #include "cetlib/PluginTypeDeducer.h"
       2              : #include "fhiclcpp/ParameterSet.h"
       3              : #include "fhiclcpp/types/ConfigurationTable.h"
       4              : 
       5              : #include "cetlib/compiler_macros.h"
       6              : #include "messagefacility/MessageLogger/MessageLogger.h"
       7              : #include "messagefacility/MessageService/ELdestination.h"
       8              : #include "messagefacility/Utilities/ELseverityLevel.h"
       9              : #include "messagefacility/Utilities/exception.h"
      10              : 
      11              : // C/C++ includes
      12              : #include <librdkafka/rdkafkacpp.h>
      13              : #include <chrono>
      14              : #include <fstream>
      15              : 
      16              : #define TRACE_NAME "Kafka_mfPlugin"
      17              : #include "trace.h"
      18              : 
      19              : namespace mfplugins {
      20              : using mf::ErrorObj;
      21              : using mf::service::ELdestination;
      22              : 
      23              : /// <summary>
      24              : /// Message Facility Kafka Streamer Destination
      25              : /// Sends messages via Kafka
      26              : /// </summary>
      27              : class ELKafka : public ELdestination
      28              : {
      29              : public:
      30              :         /**
      31              :          * \brief Configuration Parameters for ELKafka
      32              :          */
      33              :         struct Config
      34              :         {
      35              :                 /// ELDestination common config parameters
      36              :                 fhicl::TableFragment<ELdestination::Config> elDestConfig;
      37              :                 /// "kafka_key" (Default: "artdaq"): Key for grouping log messages in Kafka
      38              :                 fhicl::Atom<std::string> kafka_key = fhicl::Atom<std::string>{fhicl::Name{"kafka_key"}, fhicl::Comment{"Key for grouping log messages in Kafka"}, "artdaq"};
      39              :                 /// "kafka_server" (Default: "localhost:30092"): Address of the Kafka service
      40              :                 fhicl::Atom<std::string> kafka_server = fhicl::Atom<std::string>{fhicl::Name{"kafka_server"}, fhicl::Comment{"Address of the Kafka service"}, "localhost:30092"};
      41              :                 /// "kafka_client" (Default: Application name from /proc/<pid>/cmdline): Name of this Kafka client
      42              :                 fhicl::Atom<std::string> kafka_client = fhicl::Atom<std::string>{fhicl::Name{"kafka_client"}, fhicl::Comment{"Name of this Kafka client"}, ""};
      43              :         };
      44              :         /// Used for ParameterSet validation
      45              :         using Parameters = fhicl::WrappedTable<Config>;
      46              : 
      47              : public:
      48              :         /// <summary>
      49              :         /// ELKafka Constructor
      50              :         /// </summary>
      51              :         /// <param name="pset">ParameterSet used to configure ELKafka</param>
      52              :         ELKafka(Parameters const& pset);
      53              : 
      54              :         /**
      55              :          * \brief Serialize a MessageFacility message to the output
      56              :          * \param o Stringstream object containing message data
      57              :          * \param e MessageFacility object containing header information
      58              :          */
      59              :         void routePayload(const std::ostringstream& o, const ErrorObj& e) override;
      60              : 
      61              : private:
      62              :         // Parameters
      63              :         std::unique_ptr<RdKafka::Producer> kafka_producer_;
      64              :         std::string kafka_key_;
      65              : };
      66              : 
      67              : // END DECLARATION
      68              : //======================================================================
      69              : // BEGIN IMPLEMENTATION
      70              : 
      71              : //======================================================================
      72              : // ELKafka c'tor
      73              : //======================================================================
      74              : 
      75            0 : ELKafka::ELKafka(Parameters const& pset)
      76            0 :     : ELdestination(pset().elDestConfig())
      77            0 :     , kafka_key_(pset().kafka_key())
      78              : {
      79              :         // get process name from '/proc/pid/cmdline'
      80            0 :         std::stringstream ss;
      81            0 :         ss << "//proc//" << getpid() << "//cmdline";
      82            0 :         std::ifstream procfile{ss.str().c_str()};
      83              : 
      84            0 :         std::string procinfo;
      85              : 
      86            0 :         if (procfile.is_open())
      87              :         {
      88            0 :                 procfile >> procinfo;
      89            0 :                 procfile.close();
      90              :         }
      91              : 
      92            0 :         size_t end = procinfo.find('\0');
      93            0 :         size_t start = procinfo.find_last_of('/', end);
      94              : 
      95            0 :         std::string app = procinfo.substr(start + 1, end - start - 1);
      96              : 
      97            0 :         RdKafka::Conf* k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
      98            0 :         std::string errstr;
      99              : 
     100            0 :         k_conf->set("bootstrap.servers", pset().kafka_server(), errstr);
     101            0 :         if (errstr != "")
     102              :         {
     103            0 :                 throw std::runtime_error(errstr);
     104              :         }
     105              : 
     106            0 :         std::string client_id = pset().kafka_client();
     107            0 :         if (client_id == "")
     108              :         {
     109            0 :                 client_id = app;
     110              :         }
     111              : 
     112            0 :         k_conf->set("client.id", client_id, errstr);
     113            0 :         if (errstr != "")
     114              :         {
     115            0 :                 throw std::runtime_error(errstr);
     116              :         }
     117              : 
     118              :         // Create producer instance
     119            0 :         kafka_producer_.reset(RdKafka::Producer::create(k_conf, errstr));
     120              : 
     121            0 :         if (errstr != "")
     122              :         {
     123            0 :                 throw std::runtime_error(errstr);
     124              :         }
     125            0 : }
     126              : 
     127              : //======================================================================
     128              : // Message router ( overriddes ELdestination::routePayload )
     129              : //======================================================================
     130            0 : void ELKafka::routePayload(const std::ostringstream& oss, const ErrorObj& msg)
     131              : {
     132            0 :         const auto& xid = msg.xid();
     133              :         // get the topic
     134            0 :         auto topic = xid.id();
     135              : 
     136            0 :         auto key = kafka_key_;
     137              : 
     138            0 :         std::string binary = oss.str();
     139            0 :         uint64_t timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
     140              : 
     141              :         //      RdKafka::Producer::RK_MSG_COPY to be investigated
     142            0 :         RdKafka::ErrorCode err = kafka_producer_->produce(topic,
     143              :                                                           RdKafka::Topic::PARTITION_UA,
     144              :                                                           RdKafka::Producer::RK_MSG_COPY,
     145            0 :                                                           const_cast<char*>(binary.c_str()), binary.size(),
     146            0 :                                                           key.c_str(),
     147              :                                                           key.size(),
     148              :                                                           timestamp_ms,
     149              :                                                           nullptr);
     150            0 :         if (err != RdKafka::ERR_NO_ERROR)
     151              :         {
     152            0 :                 TLOG(TLVL_ERROR) << "Error sending message to Kafka: " << RdKafka::err2str(err);
     153              :         }
     154            0 : }
     155              : }  // end namespace mfplugins
     156              : 
     157              : //======================================================================
     158              : //
     159              : // makePlugin function
     160              : //
     161              : //======================================================================
     162              : 
     163              : #ifndef EXTERN_C_FUNC_DECLARE_START
     164              : #define EXTERN_C_FUNC_DECLARE_START extern "C" {
     165              : #endif
     166              : 
     167              : EXTERN_C_FUNC_DECLARE_START
     168            0 : auto makePlugin(const std::string& /*unused*/, const fhicl::ParameterSet& pset)
     169              : {
     170            0 :         return std::make_unique<mfplugins::ELKafka>(pset);
     171              : }
     172              : }
     173              : 
     174            0 : DEFINE_BASIC_PLUGINTYPE_FUNC(mf::service::ELdestination)
        

Generated by: LCOV version 2.0-1