返回到文章

采纳

编辑于 4年前

请教下kafka消费速度问题

kafka

您好,有个问题请教下。

环境:

kafka版本2.6.0,单节点(centOS7.6,4核16G),主题名“Topic1”,共30个分区,

自定义containerFactory命名为“batchFactory”,concurrency=30,其他配置均为spring-kafka.2.3.4.RELEASE默认,消费者组目前一共有4个。

问题描述:

由于业务对实时性要求比较高,且数据在整点前后几分钟时数据相对较大,目前整点数据量在2000条/每秒,根据对kafka写入时间与消费者读取消息时间记录后做对比,大部分数据在kafka写入50ms后便由消费者拉取,但某些分区偶尔会存在1-3秒延迟后才由消费者拉取(大多集中在数据量大时,30万条左右数据出现200条大于1秒延迟的数据,最大的将近4秒),请问下:

1)后续应该做哪些优化能提高消费速度?
2)kafka搭建集群会不会提升消费速度?
3)如果消费者做3节点集群,30个分区,每个节点设置10个线程,还是30个线程?

感谢。

已经做的优化:

(1)消费者引入线程池,数据在拉取到后立即异步处理,spring-kafka自动提交设置为false(默认)。
(2)分区提高到30个(原为12个),有一些提高。
(3)批量消费用List<ConsumerRecord<?,?>>,拉取到后再循环异步消费,因为fetch-min-size默认为1,不知道批量有没有生效,批量时max-poll-records设置为10

@KafkaListener(topics = {"Topic1"},containerFactory = "batchFactory")
    public void listen1(ConsumerRecord<?, ?> record) {
      String threadName = Thread.currentThread().getName();
        long start = new Date().getTime();
       //异步将一些kafka信息入库,为了比较kafka写入时间与当前线程读取消息时间
       apiAsyncService.addKafkaLog(start,record,threadName);
       //异步业务处理
       listenAsyncService.listen1Task(record);
    }