返回到文章

采纳

编辑于

kafka bootstrap.servers中有不可用的server导致producer失败丢消息

kafka

kafka官方文档的Producer的bootstrap.servers的解释中这样写到

This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). If no server in this list is available sending data will fail until on becomes available.

bootstrap.servers中不需要包含所有的brokers,you may want more than one, though, in case a server is down
应该是说在list中只要有一个可用的就行,producer第一次连接获取metadata时会遍历list中的broker,一个不行会换另一个

但是我测试发现并不会换

我搭建一个集群,b1,b2,b3. producer的bootstrap.servers中写b1,b2,b4(b4不存在), 或者把b1杀掉,bootstrap.servers中写b1,b2,b3(b1已经死了), 总之就是bootstrap.servers中有不可用的broker

启动producer,send的时候有一定的概率会抛出IO Error异常,持续60s,然后Timeout Exception: failed to update metadata in 60000ms.

然后producer就退出了,消息没发出去(丢失)。

for(int i = 0; i < 100; ++i){
    pr = new Producer(properties);
    pr.send(xx)
    pr.close();
}

总会有一些producer选到b1,或者b4,然后就发生错误了。

producer首先从bootstrap servers列表中选一个broker发送metadata请求,根据metadata确定本条消息应该发到哪台机器,然后和那台机器建立连接,把消息发过去。

但是如果选到的 boostrap servers中不可用的b1,或者b4,获取不到metadata,从实验来看并没有换其他的broker尝试,我在源码中也找了,没看到相关的实现。

2018-03-08 15:39:11,779 DEBUG [org.apache.kafka.clients.producer.internals.Sender] - Starting Kafka producer I/O thread.
2018-03-08 15:39:11,854 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:11,855 DEBUG [org.apache.kafka.clients.NetworkClient] - Init connection to node -1 for sending metadata request in the next iteration
2018-03-08 15:39:11,855 DEBUG [org.apache.kafka.clients.NetworkClient] - Initiating connection to node -1 at 10.142.233.55:9092.
2018-03-08 15:39:11,857 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:11,966 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,067 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,171 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,275 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,376 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,482 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,583 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,687 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,788 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,889 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,997 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,102 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,208 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,266 WARN [org.apache.kafka.common.network.Selector] - Error in I/O with /10.142.233.55
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
2018-03-08 15:39:13,266 DEBUG [org.apache.kafka.clients.NetworkClient] - Node -1 disconnected.

持续60s,然后就update metadata TimeoutException了。


kafka版本是 0.8.2.1,试过0.10的版本好像也一样。

方便的话可以在自己的集群上测试下,告诉我结果

或者熟悉源码及相关流程的指点一下?

感激不尽!