====== Producer für Apache Kafka (librdkafka++) ====== [[http://kafka.apache.org|Apache Kafka]] Code ist größtenteils kopiert von [[https://github.com/edenhill/librdkafka|librdkafka auf github]]. Das ist lediglich ein Producer in C++. Für librdkafka muss folgendes gemacht werden: * Konfigurationen erzeugen (global, topic) * Broker setzen (//eventuell optional?//) * Producer erzeugen (Angabe der globalen Konfig) * Topic erzeugen (Topicname, topic-config) * producer->produce ( ... ) * topic und producer löschen * rdKafka mitteilen, dass es Speicher freigeben soll. ===== Hinweis ===== librdkafka scheint beim bauen (''make install'') die Shared-Objekts nicht in die entsprechenden Ordner zu verschieben. Linken funktioniert problemlos, beim starten meckert allerdings der Programmlader, dass librdkafka++.so.1 bzw. librdkafka.so.1 nicht gefunden werden können. **Lösung** Beide Dateien in das Verzeichnis kopieren und mittels ''export LD_LIBRARY_PATH=$(pwd)'' für den Programmlader auffindbar machen. ===== Code ===== #include #include #include #include #include #include #include static bool run = true; static void sigterm (int sig) { run = false; } class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb (RdKafka::Message &message) { std::cout << "Message delivery for (" << message.len() << " bytes): " << message.errstr() << std::endl; } }; class ExampleEventCb : public RdKafka::EventCb { public: void event_cb (RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; int main () { std::string errstr; std::string topic_str = "test3"; std::string host = "localhost:9092"; int32_t partition = RdKafka::Topic::PARTITION_UA; int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; // create configurations (global and topic) RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // list of brokers conf->set("metadata.broker.list", host, errstr); // event callback ExampleEventCb ex_event_cb; conf->set("event_cb", &ex_event_cb, errstr); signal(SIGINT, sigterm); signal(SIGTERM, sigterm); // delivery callback ExampleDeliveryReportCb ex_dr_cb; conf->set("dr_cb", &ex_dr_cb, errstr); // create producer RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { std::cerr << "Failed to create producer: " << errstr << std::endl; exit(1); } std::cout << "% Created producer " << producer->name() << std::endl; // create topic-handle RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); if (!topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; exit(1); } for (std::string line; run and std::getline(std::cin, line);) { if (line.empty()) { producer->poll(0); continue; } /* * Produce message */ RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(line.c_str()), line.size(), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl; else std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl; producer->poll(0); } run = true; while (run and producer->outq_len() > 0) { std::cerr << "Waiting for " << producer->outq_len() << std::endl; producer->poll(1000); } delete topic; delete producer; RdKafka::wait_destroyed(5000); } all: g++ main.cpp -g -lrdkafka++ -lrdkafka -lpthread -lz -lrt