返回到文章

采纳

编辑于

kakfa简单消费消息信息丢失问题Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946

kafka

我写了一个简单的消费者,每次消费500条消息后,会同步提交偏移量,但是日志显示的偏移量存在明显略过了一部分消息的情况。我的topic只有一个分区。
代码如下:


String[] getMessage(int num) throws Exception{
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        int count = 0;
        if(consumer!=null){
            try {
                while (count < num) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        message[count] = record.value();
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        count++;
                        if(count == num){
                            break;
                        }
                    }
                }
                consumer.commitSync(currentOffsets);
            }catch (Exception e) {
                logger.error(String.format("poll message occurs error:%s", e.toString()));
            }
        }
        return message;
    }

下面是在主函数中调用这个函数:

while(true){
   message = getMessage(500);
}

在日志中出现了如下情况:

[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 187709 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188209 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 188446 since the current position is 188682
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188945 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 189182 since the current position is 189418

请问大神这是什么原因造成的呢?