如何排查kafka消费者自动停止的问题?

慕容紫枫 发表于: 2020-02-21   最后更新时间: 2021-10-19 10:47:14   8,158 游览

通过springboot创建的消费者项目 通过 nohup java -jar &启动后,过一段时间进程自动停止了?

这是停止之前仅有的“有异常”的日志,其他的都是正常消费的打印日志
[org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=c1] Unsubscribed all topics or patterns and assigned partitions
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-2, groupId=c2] Unsubscribed all topics or patterns and assigned partitions
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-3, groupId=c3] Unsubscribed all topics or patterns and assigned partitions
[org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.s.scheduling.concurrent.ThreadPoolTaskScheduler - Shutting down ExecutorService
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.s.scheduling.concurrent.ThreadPoolTaskScheduler - Shutting down ExecutorService
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.scheduling.concurrent.ThreadPoolTaskScheduler - Shutting down ExecutorService
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - c1: Consumer stopped
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - c2: Consumer stopped
[org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - c3: Consumer stopped
[SpringContextShutdownHook] INFO  o.s.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService 'applicationTaskExecutor'
kafka集群 有一个topic主题 3个分片
消费者一共创建了三个监听器 每个监听器分别监听不同的分片
@KafkaListener(groupId = "c1" , topicPartitions = { @TopicPartition(topic = "topic", partitions = { "0" }) })
@KafkaListener(groupId = "c2" , topicPartitions = { @TopicPartition(topic = "topic", partitions = { "1" }) })
@KafkaListener(groupId = "c3" , topicPartitions = { @TopicPartition(topic = "topic", partitions = { "2" }) })

请问根据上面仅有的 日志怎么排查下消费者 进程运行中断的原因?

该台服务器配置 4核8G 通过外网链接kafka集群 运行一段时间后springboot 进程就自己结束了

另一个消费者 8核16G 通过外网链接,同样订阅该主题,也是三个监听器,但是一切正常

是不是因为有什么配置导致的 ?kafka配置 或者消费者配置 服务器上kafka集群除了基本修改的地方 其他大多数配置 都是用的默认的配置 消费者也基本都是默认的配置)

或者是springboot 项目启动后经过一段时间 内存溢出导致进程终止?

麻烦大佬帮看下 有没有什么怀疑点,或者定位排查方法,感谢。

发表于 2020-02-21
添加评论

看下os系统日志,搜下是否有OOM或者kill相关的日志。
日志目录'/var/log/messages'

你的答案

查看kafka相关的其他问题或提一个您自己的问题