#include #include #include #include #include #include #include static bool run = true; static void sigterm (int sig) { run = false; } class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb (RdKafka::Message &message) { std::cout << "Message delivery for (" << message.len() << " bytes): " << message.errstr() << std::endl; } }; class ExampleEventCb : public RdKafka::EventCb { public: void event_cb (RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; int main () { std::string errstr; std::string topic_str = "test3"; std::string host = "localhost:9092"; int32_t partition = RdKafka::Topic::PARTITION_UA; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; // create configurations (global and topic) RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // list of brokers conf->set("metadata.broker.list", host, errstr); // event callback ExampleEventCb ex_event_cb; conf->set("event_cb", &ex_event_cb, errstr); signal(SIGINT, sigterm); signal(SIGTERM, sigterm); // delivery callback ExampleDeliveryReportCb ex_dr_cb; conf->set("dr_cb", &ex_dr_cb, errstr); // create producer RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { std::cerr << "Failed to create producer: " << errstr << std::endl; exit(1); } std::cout << "% Created producer " << producer->name() << std::endl; // create topic-handle RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); if (!topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; exit(1); } for (std::string line; run and std::getline(std::cin, line);) { if (line.empty()) { producer->poll(0); continue; } /* * Produce message */ RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(line.c_str()), line.size(), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl; else std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl; producer->poll(0); } run = true; while (run and producer->outq_len() > 0) { std::cerr << "Waiting for " << producer->outq_len() << std::endl; producer->poll(1000); } delete topic; delete producer; RdKafka::wait_destroyed(5000); }