轻骑踏红尘

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 轻骑踏红尘c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题 中 :

    我觉得你还是得通过时间获取offset的位置,因为客户端是无法得知当前offset的位置。

    3年前
  • 轻骑踏红尘 回复 半兽人c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题 中 :

    我看源码中例子是设置了offset = RD_KAFKA_OFFSET_BEGINNING就可以了,可是我测试的时候不太行,这是源码中对RD_KAFKA_OFFSET_END字段的注解:

    #define RD_KAFKA_OFFSET_BEGINNING                                              \
            -2 /**< Start consuming from beginning of                              \
                *   kafka partition queue: oldest msg */
    #define RD_KAFKA_OFFSET_END                                                    \
            -1 /**< Start consuming from end of kafka                              \
                *   partition queue: next msg */
    #define RD_KAFKA_OFFSET_STORED                                                 \
            -1000 /**< Start consuming from offset retrieved                       \
                   *   from offset store */
    #define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */
    
    3年前
  • 半兽人 回复 轻骑踏红尘c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题 中 :

    没看到你通过时间获取offset的逻辑。

    3年前
  • 轻骑踏红尘 回复 半兽人c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题 中 :

    找是找到了,可是不管怎么设置都没生效,难受呀,不知道问题在哪里,还需要设置其它什么,我的想法是在订阅前把这个设置完成,代码如下:

    bool MessageQueueConsumer::subscribeAction(){
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *subscription; 
    
        subscription = rd_kafka_topic_partition_list_new(_topocList.size());
       pthread_mutex_lock(&_mutexboolessageQueueConsumer::subscribeActionbool MessageQueueConsumer::subscribeAction(){
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *subscription; 
    
        subscription = rd_kafka_topic_partition_list_new(_topocList.size());
       pthread_mutex_lock(&_mutex);
       for(int count =  0; count; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count);
       }
       pthread_mutex_unlock(&_mutex);
       for(int; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_addount <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count);
       }
       pthread_mutex_unlock(&_mutex);
       for(int countt <  _topocListt ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count; count <  _topocList.size(); count; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count);
       }
       pthread_mutex_unlock(&_mutex);
       for(int countt <  _topocListt ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
            rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
            count);
       }
       pthread_mutex_unlock(&_mutex);
       for(int count =  0; count <  subscription->cnt; count; count <  subscription->cnt; count ++){
            subscription->elems; count <  subscription->cnt; count ++){
            subscription; count <  subscription->cnt; count ++){
            subscriptionount <  subscription->cnt; count ++){
            subscription->elems; count <  subscription->cnt; count ++){
            subscription->elems[count].offset = RD_KAFKA_OFFSET_END;
       }
       rd_kafka_seek_partitions(_handler,subscription, 1000);
       err);
       err);
       err = rd_kafka_subscribe(_handler, subscription);
       if);
       err = rd_kafka_subscribe(_handler, subscription);
       if (err) {
            //错误日志记录 todo
            rd_kafka_topic_partition_list_destroy   rd_kafka_topic_partition_list_destroy
            rd_kafka_topic_partition_list_destroy(subscription);
            return
            rd_kafka_topic_partition_list_destroy(subscription);
            return false
            rd_kafka_topic_partition_list_destroy(subscription);
            return false;
        }
        rd_kafka_topic_partition_list_destroy(subscription);
        return
            rd_kafka_topic_partition_list_destroy(subscription);
            return false
            rd_kafka_topic_partition_list_destroy(subscription);
            return false;
        }
        rd_kafka_topic_partition_list_destroy(subscription);
        return true;
    }
    
    3年前
  • 半兽人 回复 轻骑踏红尘c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题 中 :

    是可以删除group.id,可参考:

    另外一种方式是重置offset,分2步:

    1. 通过时间来获取到offset
    2. 消费者重置offset

    c++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:

    • consumer.offsetsForTimes // 通过时间,定位offset
    • consumer.seek // 重新定位offset
    3年前