您好,有个问题请教下。
kafka版本2.6.0,单节点(centOS7.6,4核16G),主题名“Topic1”,共30个分区,
自定义containerFactory命名为“batchFactory”,concurrency=30,其他配置均为spring-kafka.2.3.4.RELEASE
默认,消费者组目前一共有4个。
由于业务对实时性要求比较高,且数据在整点前后几分钟时数据相对较大,目前整点数据量在2000条/每秒,根据对kafka写入时间与消费者读取消息时间记录后做对比,大部分数据在kafka写入50ms后便由消费者拉取,但某些分区偶尔会存在1-3秒延迟后才由消费者拉取(大多集中在数据量大时,30万条左右数据出现200条大于1秒延迟的数据,最大的将近4秒),请问下:
1)后续应该做哪些优化能提高消费速度?
2)kafka搭建集群会不会提升消费速度?
3)如果消费者做3节点集群,30个分区,每个节点设置10个线程,还是30个线程?
感谢。
(1)消费者引入线程池,数据在拉取到后立即异步处理,spring-kafka自动提交设置为false(默认)。
(2)分区提高到30个(原为12个),有一些提高。
(3)批量消费用List<ConsumerRecord<?,?>>
,拉取到后再循环异步消费,因为fetch-min-size
默认为1
,不知道批量有没有生效,批量时max-poll-records设置为10
@KafkaListener(topics = {"Topic1"},containerFactory = "batchFactory")
public void listen1(ConsumerRecord<?, ?> record) {
String threadName = Thread.currentThread().getName();
long start = new Date().getTime();
//异步将一些kafka信息入库,为了比较kafka写入时间与当前线程读取消息时间
apiAsyncService.addKafkaLog(start,record,threadName);
//异步业务处理
listenAsyncService.listen1Task(record);
}