 
                        
                        我觉得你还是得通过时间获取offset的位置,因为客户端是无法得知当前offset的位置。
我看源码中例子是设置了offset = RD_KAFKA_OFFSET_BEGINNING就可以了,可是我测试的时候不太行,这是源码中对RD_KAFKA_OFFSET_END字段的注解:
#define RD_KAFKA_OFFSET_BEGINNING                                              \
        -2 /**< Start consuming from beginning of                              \
            *   kafka partition queue: oldest msg */
#define RD_KAFKA_OFFSET_END                                                    \
        -1 /**< Start consuming from end of kafka                              \
            *   partition queue: next msg */
#define RD_KAFKA_OFFSET_STORED                                                 \
        -1000 /**< Start consuming from offset retrieved                       \
               *   from offset store */
#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */
 
                        
                        没看到你通过时间获取offset的逻辑。
找是找到了,可是不管怎么设置都没生效,难受呀,不知道问题在哪里,还需要设置其它什么,我的想法是在订阅前把这个设置完成,代码如下:
bool MessageQueueConsumer::subscribeAction(){
    rd_kafka_resp_err_t err;
    rd_kafka_topic_partition_list_t *subscription; 
    subscription = rd_kafka_topic_partition_list_new(_topocList.size());
   pthread_mutex_lock(&_mutexboolessageQueueConsumer::subscribeActionbool MessageQueueConsumer::subscribeAction(){
    rd_kafka_resp_err_t err;
    rd_kafka_topic_partition_list_t *subscription; 
    subscription = rd_kafka_topic_partition_list_new(_topocList.size());
   pthread_mutex_lock(&_mutex);
   for(int count =  0; count; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count);
   }
   pthread_mutex_unlock(&_mutex);
   for(int; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_addount <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count);
   }
   pthread_mutex_unlock(&_mutex);
   for(int countt <  _topocListt ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count; count <  _topocList.size(); count; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count);
   }
   pthread_mutex_unlock(&_mutex);
   for(int countt <  _topocListt ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count);
   }
   pthread_mutex_unlock(&_mutex);
   for(int count =  0; count <  subscription->cnt; count; count <  subscription->cnt; count ++){
        subscription->elems; count <  subscription->cnt; count ++){
        subscription; count <  subscription->cnt; count ++){
        subscriptionount <  subscription->cnt; count ++){
        subscription->elems; count <  subscription->cnt; count ++){
        subscription->elems[count].offset = RD_KAFKA_OFFSET_END;
   }
   rd_kafka_seek_partitions(_handler,subscription, 1000);
   err);
   err);
   err = rd_kafka_subscribe(_handler, subscription);
   if);
   err = rd_kafka_subscribe(_handler, subscription);
   if (err) {
        //错误日志记录 todo
        rd_kafka_topic_partition_list_destroy   rd_kafka_topic_partition_list_destroy
        rd_kafka_topic_partition_list_destroy(subscription);
        return
        rd_kafka_topic_partition_list_destroy(subscription);
        return false
        rd_kafka_topic_partition_list_destroy(subscription);
        return false;
    }
    rd_kafka_topic_partition_list_destroy(subscription);
    return
        rd_kafka_topic_partition_list_destroy(subscription);
        return false
        rd_kafka_topic_partition_list_destroy(subscription);
        return false;
    }
    rd_kafka_topic_partition_list_destroy(subscription);
    return true;
}
 
                        
                        是可以删除group.id,可参考:
另外一种方式是重置offset,分2步:
时间来获取到offsetoffsetc++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:
consumer.offsetsForTimes // 通过时间,定位offsetconsumer.seek // 重新定位offset