kafka想要实现每个消费者线程在它负责的分区的积压数据大小达到指定阈值后消费数据,若达不到指定阈值则等待指定时间后再消费。

雨山前星天外 发表于: 2021-03-25   最后更新时间: 2021-03-28 16:34:41   1,287 游览
  1. 想要实现同一分区积压的数据大小达不到某个阈值时,消费者等待指定时间后再消费这些积压数据。

  2. 我们用的是阿里云Kafka集群,目前我负责的业务的主题分区数量为50,消费者数量为25,每条消息的大小在400B ~ 600B,消费者相关配置:

    fetch.min.bytes:2560000 //希望消费者从负责的每个分区中单次至少拉取出这个大小的数据
    fetch.max.wait.ms:2000 //若达不到上述的大小,则消费者等待2秒再返回积压的数据消费
    max.poll.records:10000
    其他均为默认配置
    
  3. 目前看到的情况就是:消费者在消费它负责的一个分区的数据时,并没有等待2秒再返回积压的数据进行消费,因为每次只poll出十几条甚至几条消息进行消费(远未达到2560000),并且间隔时间并没有等于或是大于2秒,普遍都是在100ms以内就进行了下一次消费。

  4. 请问这些参数如何调整才能达到上述的效果,感谢!!

发表于 2021-03-25
添加评论

最大数量也是有限制的。

fetch.max.bytes:服务器为拉取请求返回的最大数据值。这不是绝对的最大值,如果在第一次非空分区拉取的第一条消息大于该值,该消息将仍然返回,以确保消费者继续工作。接收的最大消息大小通过message.max.bytes (broker config)max.message.bytes (topic config)定义。注意,消费者是并行执行多个提取的。

你这样设计,丢一次消息数量很可怕..

看了ConsumerConfig里对于fetch.max.bytes的描述,它的默认大小是52428800,应该不是这个影响了我...而且一条消息最多600B。看了权威指南里面对于消费端参数的说明感觉自己要改动的参数也就这俩儿,对于message.max.bytes (broker config)max.message.bytes (topic config),这俩参数的默认值也远比我这一条消息可能的最大值也大得多。请问,有可能是因为消费端缓冲区(内存)大小的限制吗?
另外谢谢您的“丢消息数量很可怕”的提醒,但是消费者的提交方式是自动提交,并且消费端接收到消息后并不是异步处理,而是由每个消费者线程自己同步处理,这还会有消息丢失的风险吗?如果只是重复消费的话,在消费端这边也是做了容错方案的,如果是担心因为丢失消息条数过多而导致重复消费的消息条数过多也没关系。还是由业务决定的,不需要严谨地单条单条的消费,倾向于单次poll出很多数据,然后对这些数据聚合处理。

看了ConsumerConfig里对于fetch.max.bytes的描述,它的默认大小是52428800,应该不是这个影响了我...而且一条消息最多600B。看了权威指南里面对于消费端参数的说明感觉自己要改动的参数也就这俩儿(fetch.min.bytesfetch.max.wait.ms),对于message.max.bytes (broker config)max.message.bytes (topic config),这俩参数的默认值也远比我这一条消息可能的最大值也大得多。请问,有可能是因为消费端缓冲区(内存)大小的限制吗?
另外谢谢您的“丢消息数量很可怕”的提醒,但是消费者的提交方式是自动提交,并且消费端接收到消息后并不是异步处理,而是由每个消费者线程自己同步处理,这还会有消息丢失的风险吗?如果只是重复消费的话,在消费端这边也是做了容错方案的,如果是担心因为丢失消息条数过多而导致重复消费的消息条数过多也没关系。还是由业务决定的,不需要严谨地单条单条的消费,倾向于单次poll出很多数据,然后对这些数据聚合处理。

最终效果其实是想所有消费者线程每隔两秒批量消费一次数据,调这些参数只是想偷个懒....如果实在不行的话....我就改成用定时任务去拉取消费数据,哈哈哈~

你的答案

查看kafka相关的其他问题或提一个您自己的问题