我看了下kafka-manager中的Bytes In 在500k,Bytes Out在700k的时候会报错:
kafka-producer-network-thread | producer-1","info":"send kafka message failed, error:Expiring 18 record(s) for camera-0 due to 30002 ms has passed since batch creation plus linger time
目前我们的业务场景是针对链路的信息采集,包括controller、dubbo、druid等的采集,每个请求会发50条数据,并且数据中包含SQL语句,参数等,所以单条数据量比较大。通过kafka-manager可以看到每个broker Bytes In =500k,Bytes Out = 700k,而其中针对交易的场景数据量会比较大,所以交易的应用会报上述错误。按照上述参数调整后。
副本设置为2,Poducer 的配置
batch.size=4000
linger.ms=1
acks=1
retries=0
依然会报如下错误:
error:Failed to allocate memory within the configured max blocking time 60000 ms 等。
目前我将分区扩大到20个,使用1个kafkaProducer发送数据,目前只是在抢购高峰期会出现这种错误:Expiring 3 record(s) for camera-5 due to 30009 ms has passed since batch creation plus linger time. 根据 https://issues.apache.org/jira/browse/KAFKA-4557 描述,准备提升kafka版本至0.10.2.0。 然后再提高kafkaProducer里的request.timeout.ms至60000ms。
你这个错误是达到的消息,比发送的消息要快的多了。这几个参数确实可以缓解,topic中副本最好主备就好。
大神的意思是副本设置为2个,并且加上这些配置吗?
对的。
acks=1已经表示Leader接收即返回成功,麻烦问下这里减少副本有什么作用吗?
减少kafka之间的负载。
按照您的意思,副本设置为2,Poducer 的配置
batch.size=4000 linger.ms=1 acks=1 retries=0
我看了下kafka-manager中的Bytes In 在500k,Bytes Out在700k的时候会报错:
什么环境 都超30秒了。
线上环境的,而且这个值一直在增加。 Expiring 4 record(s) for camera-5 due to 29717795 ms has passed since last append.
抱歉,最近在忙,你可以描述一下你的整个环境、和场景。
好的,非常感谢。
目前我们的业务场景是针对链路的信息采集,包括controller、dubbo、druid等的采集,每个请求会发50条数据,并且数据中包含SQL语句,参数等,所以单条数据量比较大。通过kafka-manager可以看到每个
broker Bytes In =500k,Bytes Out = 700k
,而其中针对交易的场景数据量会比较大,所以交易的应用会报上述错误。按照上述参数调整后。副本设置为2,Poducer 的配置
batch.size=4000 linger.ms=1 acks=1 retries=0
依然会报如下错误:
你的集群有几个,topic分区多少?
buffer.memory:增加大发送总缓存。
ack=0,日志类消息不要确认了。
linger.ms=0,不要等待了。
1个集群3个broker, topic 10个分区
buffer.memory默认: 33554432(32M) ,扩大到多少呢?
1个集群3个broker说的有问题额。1个broker=1个集群。
你们用的什么客户端写的?一定要使用非阻塞式发送。
客户端用java。 非阻塞式? send()发送我们都是让其在异步线程池中进行的。
send().get()是阻塞。单send非阻塞,不需要异步线程吧,kafka这吞吐很高额。
使用的方法是:
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
我们在10个异步线程池调用send()方法。 但是初始化时,只建立了1个
producer = new KafkaProducer<>(properties);
所以有可能会造成上述错误? 按照您的提示,去掉线程池再试试。
你的并发数多少?
并发数是1000多,会产生20几万的数据量.同时会向kafka发送数据。
单台并发多少,如果你消息的实时性要求不高,那就增大阻塞时间,和提高buffer的量来缓解。
还有,你该增加集群和分区数了。
可以考虑在一个应用创建多个KafkaProducer,发送的时候,与KafkaProducer的个数取模,发送数据。你试过这样的方式没? 打算压测试试
试过,当并发到50的时候 会新建一个连接。
这里怎么去判断并发50,建立新连接呢? >_<
代码逻辑实现的 +1 和 -1 ,当有发送+1,当发送完成-1,取差即是并发。
Hi,我环境里试过了,为什么起2个KafkaProducer,线程
kafka-producer-network-thread | producer-1和 kafka-producer-network-thread | producer-2
会死锁呢?RecordAccumulator.append()在这里会死锁?
是你写的有问题吧。
贴出来看看。
/ 初始化producers个数 / private static int KAFKA_PRODUCES_COUNT = Integer.parseInt(XxlConfClient.get("zzjr-camera.kafka.producers", "1")); private static KafkaProducer<String, String>[] producers = new KafkaProducer[KAFKA_PRODUCES_COUNT]; static{ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,XxlConfClient.get("zzjr-camera.kafka.batch.size", "4000")); properties.put(ProducerConfig.LINGER_MS_CONFIG,XxlConfClient.get("zzjr-camera.kafka.linger.ms", "0")); properties.put(ProducerConfig.ACKS_CONFIG,XxlConfClient.get("zzjr-camera.kafka.acks", "0")); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,XxlConfClient.get("zzjr-camera.kafka.buffer.memory", "67108864")); properties.put(ProducerConfig.RETRIES_CONFIG, XxlConfClient.get("zzjr-camera.kafka.retries", "0")); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, XxlConfClient.get("zzjr-camera.kafka.compression.type", "none")); //初始化kafkaProducers for(int i =0; i< KAFKA_PRODUCES_COUNT; i++){ producers[i] = new KafkaProducer<>(properties); } } / kafka发送消息 @param entity @param <T> / public static <T> void send(final T entity){ try { int partition = new Random().nextInt(PARTITION); ProducerRecord record = new ProducerRecord<>(TOPIC, partition, "", ); producers[new Random().nextInt(KAFKA_PRODUCES_COUNT)].send(record, new SendCallback(record, 0)); } catch (Exception e) { logger.error("Method KafkaProducerClient.send() error case:"+ e.getMessage()); } }
通过VisualVM 看到线程kafka-producer-network-thread | producer-1 和 kafka-producer-network-thread | producer-2 阻塞了。
你为什么要指定分区呢?
随机分区就好了,new SendCallback()这个也去了吧,不需要回调。
1.我这里随机数计算分区和你说的去掉分区效果一样吗?
2.new SendCallback()去掉后看不到消息发送成功与否了吧?
1、如果你kafka端调整,那新增加的分区 永远写不进去。
2、是的,如果你收到了报错的消息,然后呢?你能处理么。
你看不到是哪条消息报的错。
目前我将分区扩大到20个,使用1个kafkaProducer发送数据,目前只是在抢购高峰期会出现这种错误:Expiring 3 record(s) for camera-5 due to 30009 ms has passed since batch creation plus linger time. 根据 https://issues.apache.org/jira/browse/KAFKA-4557 描述,准备提升kafka版本至0.10.2.0。 然后再提高kafkaProducer里的request.timeout.ms至60000ms。
1、同意你说的,增加分区后,会动态分区到新增的分区,不会存在重启等等。
2、我的回调是这么做的
/* producer回调 */ static class SendCallback implements Callback { ProducerRecord<String, String> record; int sendSeq = 0; public SendCallback(ProducerRecord record, int sendSeq){ this.record = record; this.sendSeq = sendSeq; } @Override public void onCompletion(RecordMetadata recordMetadata, Exception e){ //send success if (null == e) { return; } //send failed logger.error("send kafka message failed, error:" + e.getMessage()); if (sendSeq < 1) { producers[new Random().nextInt(KAFKA_PRODUCES_COUNT)].send(record, new SendCallback(record, ++sendSeq)); } } }
测试过回调吗?无法重发的。所以我后续就把回调去了。
还没测试到这一步,目前其他部分发送数据都还报错。
目前我将分区扩大到20个,使用1个kafkaProducer发送数据,目前只是在抢购高峰期会出现这种错误:
根据 https://issues.apache.org/jira/browse/KAFKA-4557 描述,准备提升kafka版本至0.10.2.0。
然后再提高kafkaProducer里的request.timeout.ms至60000ms。
这里你还有其他建议吗? > - <
随机可以稍微优化一下,用轮询。
https://www.orchome.com/685
你好,方便加下的你微信吗? 有些其他问题,想向你请教下。
hi,麻烦再请假下:上述的问题根源,看了几天源码,做了测试,大概能够找到问题了。增加了 request.timeout.ms =60000 和 buffer.memory = 96M。 但是在抢购高峰期,依然会出现
目前集群
brokers=5 partition=50
针对抢购大概3分钟内出现的超时问题,可以将request.timeout.ms=5分钟吗?
是不是需要更大的buffer.memory ?
有其他办法吗?
还可以选择 增加linger.time、建立多个kafkaproducer ?
buffer.memory持续增大,你的消息体积很大吗?
不算大,消息内容基本就是SQL、请求时间以及封装的JSON格式的数据。 抢购2分钟时间产生超过100万条数据
你的答案