我写了一个简单的消费者,每次消费500条消息后,会同步提交偏移量,但是日志显示的偏移量存在明显略过了一部分消息的情况。我的topic只有一个分区。
代码如下:
String[] getMessage(int num) throws Exception{
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
if(consumer!=null){
try {
while (count < num) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
message[count] = record.value();
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
count++;
if(count == num){
break;
}
}
}
consumer.commitSync(currentOffsets);
}catch (Exception e) {
logger.error(String.format("poll message occurs error:%s", e.toString()));
}
}
return message;
}
下面是在主函数中调用这个函数:
while(true){
message = getMessage(500);
}
在日志中出现了如下情况:
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 187709 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 187710 since the current position is 187946
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188209 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 188446 since the current position is 188682
[org.apache.kafka.clients.consumer.internals.Fetcher]Sending fetch for partitions [topic12-0] to broker host-129-152:9092 (id: 45 rack: null)
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]Group test committed offset 188945 for partition topic12-0
[org.apache.kafka.clients.consumer.internals.Fetcher]Ignoring fetched records for topic12-0 at offset 189182 since the current position is 189418
请问大神这是什么原因造成的呢?