返回到文章

采纳

编辑于 3年前

kafka想要实现每个消费者线程在它负责的分区的积压数据大小达到指定阈值后消费数据,若达不到指定阈值则等待指定时间后再消费。

kafka

  1. 想要实现同一分区积压的数据大小达不到某个阈值时,消费者等待指定时间后再消费这些积压数据。

  2. 我们用的是阿里云Kafka集群,目前我负责的业务的主题分区数量为50,消费者数量为25,每条消息的大小在400B ~ 600B,消费者相关配置:

    fetch.min.bytes:2560000 //希望消费者从负责的每个分区中单次至少拉取出这个大小的数据
    fetch.max.wait.ms:2000 //若达不到上述的大小,则消费者等待2秒再返回积压的数据消费
    max.poll.records:10000
    其他均为默认配置
    
  3. 目前看到的情况就是:消费者在消费它负责的一个分区的数据时,并没有等待2秒再返回积压的数据进行消费,因为每次只poll出十几条甚至几条消息进行消费(远未达到2560000),并且间隔时间并没有等于或是大于2秒,普遍都是在100ms以内就进行了下一次消费。

  4. 请问这些参数如何调整才能达到上述的效果,感谢!!