返回到文章

采纳

编辑于

基于librdkafka的c++消费者封装

kafka

有两个问题:

1. 本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:

  /**
   class RD_EXPORT KafkaConsumer : public virtual Handle {
   ...
   * @brief Consume message or get error event, triggers callbacks.
   * Will automatically call registered callbacks for any such queued events,
   * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
   * etc.
   * ....
   virtual Message *consume (int timeout_ms) = 0;
   ...
   }
   */


注:继承RdKafka::RebalanceCb就可以实现回调

namespace EDU_SP_KAFKA {
class KafkaRebalanceCb : public RdKafka::RebalanceCb {
public:
        KafkaRebalanceCb() = default;
        ~KafkaRebalanceCb() = default;
        virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                                  RdKafka::ErrorCode err,
                                  std::vector<RdKafka::TopicPartition*> &partitions) override;
}

int KafkaConsumeImp::initConsumer() {
    std::string errstr;
    // create KafkaRebalanceCb & KafkaEventCb callback object
    kafka_event_cb_ = std::make_shared<EDU_SP_KAFKA::KafkaEventCb>();
    kafka_reblance_cb_ = std::shared_ptr<EDU_SP_KAFKA::KafkaRebalanceCb>();

    //TODO:好像设置的回调没起作用
    kafka_conf_->set("event_cb", kafka_event_cb_.get(), errstr);
    kafka_conf_->set("rebalance_cb", kafka_reblance_cb_.get(), errstr)
    ...};

2. 我是基于librdkafka c++库的基础上再封装了一层sdk提供给上层业务来使用 ,开单独的线程去消费消息,并且模拟了一下业务demo调用,消息消费正常。想问一下进程启动之后各个线程是做什么的,线程启动如下:

PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND 
42545 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-
42546 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.01 rdk:broker-1
42547 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.78 rdk:main
42548 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 rdk:broker-1
42549 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.30 rdk:broker1
42550 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.28 rdk:broker2
42551 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.61 rdk:broker3
42552 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-

我的理解是:
(1) broke1、broker2、broker3:是分别用来和三个kafka集群进行交互的
(2)42545(kafka-consumer-):是消息事件回调:RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
(3)42552(kafka-consumer-):是开的单个线程去消费消息
(4) 还有两个broker-1不知道做什么用的

线程消费消息代码如下:

bool KafkaConsumeImp::start_consumer() {
    // begin consume thread
    thread_ = std::make_unique<std::thread>([this] {
        this->start();
    });}

bool KafkaConsumeImp::start() {
    /*Consume messages*/
    if(started_) {
        return -1;
    }
    if(!kafka_consumer_) {
        return -2;
    }
    try {
        while (EDU_SP_KAFKA::run) {
            RdKafka::Message *msg = kafka_consumer_->consume(6000);
            consumer_cb(msg, nullptr);
    } catch (std::exception &e) {
        std::cout << "Execption: consuemer_ start failure !!!" << std::endl;
        return -4;
    }
    started_ = true;
    return true;
}