Producer für Apache Kafka (librdkafka++)

Apache Kafka

Code ist größtenteils kopiert von librdkafka auf github. Das ist lediglich ein Producer in C++. Für librdkafka muss folgendes gemacht werden:

  • Konfigurationen erzeugen (global, topic)
  • Broker setzen (eventuell optional?)
  • Producer erzeugen (Angabe der globalen Konfig)
  • Topic erzeugen (Topicname, topic-config)
  • producer→produce ( … )
  • topic und producer löschen
  • rdKafka mitteilen, dass es Speicher freigeben soll.

Hinweis

librdkafka scheint beim bauen (make install) die Shared-Objekts nicht in die entsprechenden Ordner zu verschieben. Linken funktioniert problemlos, beim starten meckert allerdings der Programmlader, dass librdkafka++.so.1 bzw. librdkafka.so.1 nicht gefunden werden können.

Lösung Beide Dateien in das Verzeichnis kopieren und mittels export LD_LIBRARY_PATH=$(pwd) für den Programmlader auffindbar machen.

Code

main.cpp
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
 
#include <librdkafka/rdkafkacpp.h>
 
static bool run = true;
 
static void sigterm (int sig) {
  run = false;
}
 
 
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
  }
};
 
 
class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;
 
      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;
 
      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;
 
      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};
 
 
int main () 
{
    std::string errstr;
    std::string topic_str = "test3";
    std::string host = "localhost:9092";
 
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
 
    // create configurations (global and topic)
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 
    // list of brokers
    conf->set("metadata.broker.list", host, errstr);
 
    // event callback
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);
 
    signal(SIGINT, sigterm);
    signal(SIGTERM, sigterm);
 
    // delivery callback
    ExampleDeliveryReportCb ex_dr_cb;
    conf->set("dr_cb", &ex_dr_cb, errstr);
 
    // create producer
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
      std::cerr << "Failed to create producer: " << errstr << std::endl;
      exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;
 
    // create topic-handle
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }
 
    for (std::string line; run and std::getline(std::cin, line);) 
    {
        if (line.empty()) {
            producer->poll(0);
            continue;
        }
 
      /*
       * Produce message
       */
        RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                    const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
        if (resp != RdKafka::ERR_NO_ERROR)
	        std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl;
        else
	        std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
        producer->poll(0);
    }
    run = true;
 
    while (run and producer->outq_len() > 0) 
    {
        std::cerr << "Waiting for " << producer->outq_len() << std::endl;
        producer->poll(1000);
    }
 
    delete topic;
    delete producer;
    RdKafka::wait_destroyed(5000);
}
Makefile
all:
	g++ main.cpp -g -lrdkafka++ -lrdkafka -lpthread -lz -lrt