返回到文章

采纳

编辑于

kafka生产数据时报TimeoutException异常

kafka

在生产线程运行过程中,有的生产成功,有的发生下面的异常

记录大小11371
生产的数据条数: 11371
记录大小11392

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-1

16/06/17 11:02:23 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-1:

生产了数据:16
生产的数据条数: 11392
记录大小11450
生产的数据条数: 11450
记录大小11434
生产的数据条数: 11434
记录大小11410

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-0

16/06/17 11:02:24 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-0:

生产的数据条数: 11410
记录大小11397
生产的数据条数: 11397

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-1

16/06/17 11:02:25 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-1:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for testTopic9-0

16/06/17 11:02:25 ERROR internals.RecordBatch: Error executing user-provided callback on message for topic-partition testTopic9-0:

生产者主要配置参数如下

#The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
#This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
#The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.
#为生产者提供的可控制的内存大小(单位字节)1G  默认值为33554432
buffer.memory=1000000000


# 批量生产的请求大小,单位字节100M  默认值为1048576 The maximum size of a request in bytes. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
#批量发送数据的大小设定为3M时,经过业务封装序列化后变为17M
max.request.size=100000000


#默认值为 131072,设置100M The size of the TCP send buffer (SO_SNDBUF) to use when sending data.   
send.buffer.bytes=100000000

#最长阻塞时间(当buffer.memory的内存不足时,将被阻塞发送数据到集群),单位毫秒 默认值为60000 The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
max.block.ms=60000

#The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured
metadata.fetch.timeout.ms=60000

#压缩类型 The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
compression.type=none

#发送失败重试次数,默认值为0 Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.
retries=2

#  本地缓存大小,相当于记录总数,单位为字节,通过后台一个线程批量写入kafka集群,每个分区都有缓存,默认值为16384
#The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
#No attempt will be made to batch records larger than this size.
#Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
#A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
batch.size=10