java kafka consumer.poll在运行一段时间后出现死循环,无法获取数据

MADAO 发表于: 2019-10-25   最后更新时间: 2019-10-25 15:12:42   5,167 游览

kafka服务端显示断开与消费者链接,但是消费者没有任何异常;

[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,835] INFO [GroupCoordinator 0]: Preparing to rebalance group DataAcquireGroup in state PreparingRebalance with old generation 30 (__consumer_offsets-33) (reason: removing member consumer-3-c80f9b26-ce89-4685-8afa-d5766e32fd47 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,965] INFO [GroupCoordinator 0]: Member consumer-4-36f437e5-186b-470f-92cd-f02483b8fa7d in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,975] INFO [GroupCoordinator 0]: Member consumer-1-a0eaa01b-1fdf-40be-a9d3-b7e527bcb7fe in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,987] INFO [GroupCoordinator 0]: Member consumer-5-cf609397-08a1-4eef-b6cf-19a63ad24ad3 in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-10-25 04:41:28,989] INFO [GroupCoordinator 0]: Member consumer-2-331356e1-34ee-49a7-b352-ef2729a001bb in group DataAcquireGroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

消费者代码:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
      ToolsUtil.linkedBlockingQueue.put(record);
    }
}

消费者配置:

bootstrap.servers=127.0.0.1:9092
group.id=DataAcquireGrouphuihui
enable.auto.commit=true
auto.commit.interval.ms=1000
max.poll.interval.ms=60000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.records=100

消费者正常运作10+小时后出现问题,且没有任何异常或者是警告输出 生产者运作正常

发表于 2019-10-25
添加评论

一切正常,进程被杀,一般是oom引起的,系统日志中会有。
另外,你启动的消费者的程序是守护进程吧。

MADAO -> 半兽人 5年前

首先进程没有被杀死,查看日志发现没有任何输出,CPU使用暴涨至100%,启动的程序是普通java程序,通过打印信息排查,执行到consumer.poll(100);后程序不动了,并没有被杀死,望大佬帮忙看看

半兽人 -> MADAO 5年前

consumer.poo(100) 是长轮询,每个100ms去kafka拉取消息。
cpu100% 要找出哪个线程跑到100%了。
可参考:https://www.orchome.com/833

m -> MADAO 2年前

请问下有排查到原因吗

你的答案

查看kafka相关的其他问题或提一个您自己的问题