/**
class RD_EXPORT KafkaConsumer : public virtual Handle {
...
* @brief Consume message or get error event, triggers callbacks.
* Will automatically call registered callbacks for any such queued events,
* including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
* etc.
* ....
virtual Message *consume (int timeout_ms) = 0;
...
}
*/
注:继承RdKafka::RebalanceCb就可以实现回调
namespace EDU_SP_KAFKA {
class KafkaRebalanceCb : public RdKafka::RebalanceCb {
public:
KafkaRebalanceCb() = default;
~KafkaRebalanceCb() = default;
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) override;
}
int KafkaConsumeImp::initConsumer() {
std::string errstr;
// create KafkaRebalanceCb & KafkaEventCb callback object
kafka_event_cb_ = std::make_shared<EDU_SP_KAFKA::KafkaEventCb>();
kafka_reblance_cb_ = std::shared_ptr<EDU_SP_KAFKA::KafkaRebalanceCb>();
//TODO:好像设置的回调没起作用
kafka_conf_->set("event_cb", kafka_event_cb_.get(), errstr);
kafka_conf_->set("rebalance_cb", kafka_reblance_cb_.get(), errstr)
...};
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
42545 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 kafka-consumer-
42546 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.01 rdk:broker-1
42547 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.78 rdk:main
42548 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 rdk:broker-1
42549 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.30 rdk:broker1
42550 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.28 rdk:broker2
42551 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.61 rdk:broker3
42552 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 kafka-consumer-
我的理解是:
(1) broke1、broker2、broker3:是分别用来和三个kafka集群进行交互的
(2)42545(kafka-consumer-):是消息事件回调:RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
(3)42552(kafka-consumer-):是开的单个线程去消费消息
(4) 还有两个broker-1不知道做什么用的
bool KafkaConsumeImp::start_consumer() {
// begin consume thread
thread_ = std::make_unique<std::thread>([this] {
this->start();
});}
bool KafkaConsumeImp::start() {
/*Consume messages*/
if(started_) {
return -1;
}
if(!kafka_consumer_) {
return -2;
}
try {
while (EDU_SP_KAFKA::run) {
RdKafka::Message *msg = kafka_consumer_->consume(6000);
consumer_cb(msg, nullptr);
} catch (std::exception &e) {
std::cout << "Execption: consuemer_ start failure !!!" << std::endl;
return -4;
}
started_ = true;
return true;
}