请问kafka connect的消费者一个批次拉取的消息量怎么控制?

Thor 发表于: 2021-04-23   最后更新时间: 2021-04-27 19:15:01   3,256 游览

kafka connect的sink connectorput方法一次拉取的消息量如何配置?

@Override
public void put(final Collection<SinkRecord> sinkRecords) {}

如上sinkRecords.size()一直都是四百多,配置过consumer.properties

max.poll.records
max.partition.fetch.bytes
fetch.min.bytes
fetch.max.wait.ms

都没用

刚开始用kafka connect,希望不吝赐教。

在线,3小时前登录
发表于 2021-04-23
¥2.0
fetch.max.bytes

试试这个,参考来自:
https://www.orchome.com/535

Thor -> 半兽人 3年前

没用呀,我的consumer.properties中配的是这样的,connector运行稳定之后每个批次都是414条

max.poll.records=9999
max.partition.fetch.bytes=104857600
fetch.min.bytes=2097152
fetch.max.wait.ms=10000
fetch.max.bytes=104857600
半兽人 -> Thor 3年前

kafka连接器底层其实就是消费者,一个批次返回的逻辑是:

当要打包消息的批次数量不满足fetch.min.bytes时,则等待fetch.max.wait.ms之后,不管满足不满足,将这个批次的消息发送给消费者。

一条消息的大小 * n = 批次总大小。

按照你的配置:

批次总大小是一定是大于fetch.min.bytes,因为你设置了等待10秒,所以基本只有fetch.min.bytes在起作用。

其他的参数都够用了,所以你只需通过继续增加

fetch.min.bytes

来观察变化。

Thor -> 半兽人 3年前

你好,我刚才把fetch.min.bytes增大了10倍,可每次拉取时sinkRecords.size()还是稳定在414,感觉consumer.properties不起作用

半兽人 -> Thor 3年前

consumer.properties是没用的,这个是在使用消费者命令行时,指定配置文件使用的,如:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties

你在代码里,应该类似消费者这种

Properties props = new Properties();
props.setProperty("fetch.min.bytes", "2097152");
...
Thor -> 半兽人 3年前

嗯嗯,原来如此,十分感谢你的耐心;我还想问一下,kafka sink connect怎么对消费者进行配置呢?
我只知道需要重写start(),put(),flush(),stop()方法,貌似没有地方需要新建消费者,也就没有地方对消费者进行配置

props.setProperty("fetch.min.bytes", "2097152");

应该写在哪里呢?

半兽人 -> Thor 3年前

我今天在外面,现在没办法帮你查看。

半兽人 -> Thor 3年前

大部分场景下,kafka是批次获取消息的,默认的配置基本能达到物理机的最大能力,一般都卡在处理速度上,而不是拉取上。

Thor -> 半兽人 3年前

我懂了,就是说默认配置就够用了,重点优化put()方法中的处理逻辑,非常感谢您的解答!

你的答案

查看kafka相关的其他问题或提一个您自己的问题