====== Producer für Apache Kafka (librdkafka++) ======
[[http://kafka.apache.org|Apache Kafka]]
Code ist größtenteils kopiert von [[https://github.com/edenhill/librdkafka|librdkafka auf github]]. Das ist lediglich ein Producer in C++. Für librdkafka muss folgendes gemacht werden:
* Konfigurationen erzeugen (global, topic)
* Broker setzen (//eventuell optional?//)
* Producer erzeugen (Angabe der globalen Konfig)
* Topic erzeugen (Topicname, topic-config)
* producer->produce ( ... )
* topic und producer löschen
* rdKafka mitteilen, dass es Speicher freigeben soll.
===== Hinweis =====
librdkafka scheint beim bauen (''make install'') die Shared-Objekts nicht in die entsprechenden Ordner zu verschieben. Linken funktioniert problemlos, beim starten meckert allerdings der Programmlader, dass librdkafka++.so.1 bzw. librdkafka.so.1 nicht gefunden werden können.
**Lösung** Beide Dateien in das Verzeichnis kopieren und mittels ''export LD_LIBRARY_PATH=$(pwd)'' für den Programmlader auffindbar machen.
===== Code =====
#include
#include
#include
#include
#include
#include
#include
static bool run = true;
static void sigterm (int sig) {
run = false;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
}
};
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
int main ()
{
std::string errstr;
std::string topic_str = "test3";
std::string host = "localhost:9092";
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
// create configurations (global and topic)
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// list of brokers
conf->set("metadata.broker.list", host, errstr);
// event callback
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
// delivery callback
ExampleDeliveryReportCb ex_dr_cb;
conf->set("dr_cb", &ex_dr_cb, errstr);
// create producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
// create topic-handle
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
for (std::string line; run and std::getline(std::cin, line);)
{
if (line.empty()) {
producer->poll(0);
continue;
}
/*
* Produce message
*/
RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast(line.c_str()), line.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl;
else
std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
producer->poll(0);
}
run = true;
while (run and producer->outq_len() > 0)
{
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
delete topic;
delete producer;
RdKafka::wait_destroyed(5000);
}
all:
g++ main.cpp -g -lrdkafka++ -lrdkafka -lpthread -lz -lrt