Kafka Producer配置

原创
半兽人 发表于: 2017-02-14   最后更新时间: 2021-01-14 23:37:59  
{{totalSubscript}} 订阅, 61,308 游览

3.3 生产者配置

java生产者配置:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
bootstrap.servers host/port列表,用于初始化建立和Kafka集群的连接。列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭) list high
key.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。 class high
value.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。 class high
acks 生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:
acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。
acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。
string 1 [all, -1, 0, 1] high
buffer.memory 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。
此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。
long 33554432 [0,...] high
compression.type 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。 string none high
retries 设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。 int 0 [0,...,2147483647] high
ssl.key.password 密钥仓库文件中的私钥的密码。 password null high
ssl.keystore.location 密钥仓库文件的位置。可用于客户端的双向认证。 string null high
ssl.keystore.password 密钥仓库文件的仓库密码。只有配置了ssl.keystore.location时才需要。 password null high
ssl.truststore.location 信任仓库的位置 string null high
ssl.truststore.password 信任仓库文件的密码 password null high
batch.size 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):
不会打包大于此配置大小的消息。
发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。
较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。
int 16384 [0,...] medium
client.id 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 string "" medium
connections.max.idle.ms 多少毫秒之后关闭闲置的连接。 long 540000 medium
linger.ms 生产者组将发送的消息组合成单个批量请求。正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。 long 0 [0,...] medium
max.block.ms 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 long 60000 [0,...] medium
max.request.size 请求的最大大小(以字节为单位)。此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。请注意,服务器拥有自己的批量大小,可能与此不同。 int 1048576 [0,...] medium
partitioner.class 实现Partitioner接口的的Partitioner类。 class org.apache.kafka.clients.producer.internals.DefaultPartitioner medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用OS默认值。 int 32768 [-1,...] medium
request.timeout.ms 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。 int 30000 [0,...] medium
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;' password null medium
sasl.kerberos.service.name Kafka运行的Kerberos主体名称。可以在Kafka的JAAS配置或Kafka的配置中定义。 string null medium
sasl.mechanism SASL机制用于客户端连接。这是安全提供者可用与任何机制。GSSAPI是默认机制。 string GSSAPI medium
security.protocol 用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。如果值为 -1,将默认使用系统的。 int 131072 [-1,...] medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1 medium
ssl.keystore.type 密钥存储文件的文件格式。对于客户端是可选的。 string JKS medium
ssl.protocol 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任仓库文件的文件格式。 string JKS medium
enable.idempotence 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 boolean false low
interceptor.classes 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。默认情况下没有拦截器。 list null low
max.in.flight.requests.per.connection 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。 int 5 [1,...] low
metadata.max.age.ms 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 long 300000 [0,...] low
metric.reporters 用作metrics reporters(指标记录员)的类的列表。实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。 list "" low
metrics.num.samples 维护用于计算度量的样例数量。 int 2 [1,...] low
metrics.recording.level 指标的最高记录级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms 度量样例计算上 long 30000 [0,...] low
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 [0,...] low
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 long 50 [0,...] low
retry.backoff.ms 尝试重试指定topic分区的失败请求之前等待的时间。这样可以避免在某些故障情况下高频次的重复发送请求。 long 100 [0,...] low
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin Login线程刷新尝试之间的休眠时间。 long 60000 low
sasl.kerberos.ticket.renew.jitter 添加更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 double 0.8 low
ssl.cipher.suites 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.keymanager.algorithm 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 用于SSL连接的信任管理因子算法。默认值是JAVA虚拟机配置的信任管理工厂算法。 string PKIX low
transaction.timeout.ms 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。 int 60000 low
transactional.id 用于事务传递的TransactionalId。这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。如果没有提供TransactionalId,则生产者被限制为幂等传递。请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。 string null non-empty string low

kafka >= 2.0.0

名称 描述 类型 默认 有效值 重要程度
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler class null 中间
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin class null 中间

kafka >= 2.1.0

名称 描述 类型 默认 有效值 重要程度
client.dns.lookup 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。 string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] 中间
delivery.timeout.ms 调用send()返回后报告成功或失败的时间上限。这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。这个配置的值应该大于或等于request.timeout.mslinger.ms之和。 int 120000 (2 minutes) [0,...] 中间

kafka >= 2.7

名称 描述 类型 默认 有效值 重要程度
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。 password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 long 127000 (127 seconds) 中间
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。 long 10000 (10 seconds) 中间
更新于 2021-01-14

XHX. 1年前

你好,请问下这个问题是什么原因呢?可以通过修改生产者配置修复吗?

org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for CHARGE_LANE_INT_TOPIC-0:140123 ms has passed since batch creation

半兽人 -> XHX. 1年前

你看下这篇文章先自查一下
https://www.orchome.com/141

雪狐 3年前

你好,我想问下我往多个topic里面发消息,是不是用一个producer就可以了,因为我topic会动态的增加或减少。

半兽人 -> 雪狐 3年前

是的 用一个就够了

技颠 4年前

大哥 我看buffer.memory这个缺省值是32M,我也没有设置这个值,但是我在运行的应用中dump了内存,发现org.apache.kafka.clients.producer.internals.RecordAccumulator这个类型关联的对象达到了100M(都是待发送的对象),这个情况是为什么呢?

半兽人 -> 技颠 4年前

不是硬约束,因为生产者所使用的所有内存都用于缓冲。
这句话有解释额。

技颠 -> 半兽人 4年前

那有别的办法能够限制在producer端的数据大小吗?如果不做硬性限制的话是有OOM的可能的

半兽人 -> 技颠 4年前

不会有oom的,java8之后有个新特性,是利用未被占用的内存。系统会自动控制这些,OOM是在满负荷情况下,需要使用的内存超过了你的设置,才会触发。

技颠 -> 半兽人 4年前

我们的应用堆内存很小,只有130M,在这种情况下没有办法能够控制kafka RecordAccumulator占用的内存大小吗?在日志中能看到一直在打印OOM异常,dump发现RecordAccumulator关联了100M的内存左右。这个OOM是属于达到了GC的阈值设置,即:GC overhead limit exceeded 。

您好,我有一个问题想请教一下,我们这边配置的ack为1,按理说主节点应答后kafka主节点日志应该会有消息,我客户端程序发了消息后,程序日志是正常的,但是kafka所有节点的消息日志都没有看到有发过的消息,这是为什么呀?

会不会是那个缓冲区,由于发消息太快,缓冲满了后把后加入的消息丢掉了?

确认过消息是丢了吗?

嗯 程序这边看日志是没有发现任务异常的,其他的消息也对比找过能够在kafka的消息日志里面找到

客户端程序丢失消息的情况有很多,比如测试程序结束了,但是消息还在缓冲区还没来得急发送,进程就结束了。

你可以用同步进行测试,在发送后面加.get()

嗯 非常感谢你的建议,丢消息这个情况是出现在线上,线上不好进行同步测试,我们都是异步在发送

5年前

message.max.bytes 默认值 比 broker max.request.size 默认值大,不是应该是produce的消息的最大值应该小于broker 的最大接收值吗?

半兽人 -> 5年前

现在不是这样吗?

-> 半兽人 5年前

上面说错了 现在 :
produce的消息的默认最值(max.request.size 默认值1048576)

borker(message.max.bytes 默认值1000012)大怎么存进去的?

-> 5年前

单位不一样吗

Lee 6年前

acks   生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:
acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻【天际】到socket缓冲区并考虑发送

-------------
添加

半兽人 -> Lee 6年前

细心,已修改。

马踏紫陌 6年前

[2018-09-05 14:13:56,592] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 33 : {test=LEADER_NOT_AVAILABLE} 这个是什么原因?

半兽人 -> 马踏紫陌 6年前

第一次主题不存在时会打印。告警可忽视。

七零颂歌 6年前

2018-08-16 15:50:59.332 ERROR 11396 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='0' and payload='foo' to topic test:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.

请问一下我用spring-kafka 生产者来发消息,代码按照官方指导的思路敲的,但是每次发送消息都报超时,服务器确认都是通的,求大佬指导一下原因是啥

半兽人 -> 七零颂歌 6年前

1、端口配置:9092。
2、listeners配置了吧,ip。

勇闯天涯 6年前

生产者配置文件是 producer.properties  这个吗?

半兽人 -> 勇闯天涯 6年前

这个只是给命令使用的,还必须要指定。

止殇 -> 半兽人 6年前

你好,请问下这个“给命令用”什么意思, 是用命令来启动生产者时,可以指定该配置文件吗?  如果是这样的话,那么就可java客户端生产设置的参数没有关系了。

半兽人 -> 止殇 6年前

这个文件对程序和命令是没有任何作用,使用的时候要指定才有效果。

止殇 -> 半兽人 6年前

哦。感谢了。

宁静致远 6年前

请问kafka的producer.properties的配置怎么才能生效呢

半兽人 -> 宁静致远 6年前

配置在客户端代码里,重新运行就生效了。

宁静致远 -> 半兽人 6年前

重启了  但是貌似只有server.properties生效了 producer.properties和consumer.properties都没有生效哇~

半兽人 -> 宁静致远 6年前

懂了,我以为你说的是客户端代码。
执行的时候带上 --producer.config config/producer.properties

可参考:https://www.orchome.com/454

宁静致远 -> 半兽人 6年前

可以了 赞赞赞~

宁静致远 -> 半兽人 6年前

大神 再问一个问题哈 配置了consumer.properties之后怎么在spark streaming端生效呢~

半兽人 -> 宁静致远 6年前

生效不了的, 这个consumer.properties只适用命令指定配置而已,用于测试。
spark streaming你所用的客户端有对应设置的。

宁静致远 -> 半兽人 6年前

spark streaming和kafka集成的配置都在kafkaParameters里写的,但是kafkaParameters又必须是Map<String,String>类型的,
而这个参数的配置又不能写成这样kafkaParameters.put("fetch.message.max.bytes","52428800");,"52428800"这个字符串java对不能自动转化为Long,所以。。。

半兽人 -> 宁静致远 6年前

这个真的好尴尬,属于bug,你在字符串带个L试试。。。

宁静致远 -> 半兽人 6年前

是啊,好尴尬。Exception in thread "main" java.lang.NumberFormatException: For input string: "52428800L"加完之后报这个错了,它的意思应该是这个配置参数是数字类型的而我输入了String类型的,但是kafkaParameters又不能是Map<String,Long>类型或者Map<String,Object>类型的~

無名 -> 宁静致远 6年前

spark streaming是scala写的吧,理论上用的也是java的客户端。

宁静致远 -> 無名 6年前

是scala写的  提供的有scala API,也有java API,我使用java写的~

半兽人 -> 宁静致远 6年前

java写的用这个呀。

Properties props = new Properties();

宁静致远 -> 半兽人 6年前

就在刚刚,终于知道是怎么回事了,哈哈哈哈哈哈哈,好开心。正确的配置确实是kafkaParameters.put("fetch.message.max.bytes","52428800"),因为刚开始我可能给设置成5242880000,超过了int的大小(估计这个值应该是int,不应该是Long),int的取值范围是(-2147483648~2147483648);而后来又给设置成52428800L(Long)了,所以也是转化的过程中报错了,所以最后配置成52428800就成功了。哈哈哈  谢谢大神的引导~~~~

半兽人 -> 宁静致远 6年前

哈哈,恭喜。

庸人自扰 -> 半兽人 3年前

大佬,我有kafka的connector-distribute.sh启动kafka connector 指定producer.properties不生效啊?

半兽人 -> 庸人自扰 3年前

到问题专区发起个提问吧 把完整命令发一下哦

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章