kafka集群重启后,消费者提示无法与leader建立连接

▓千年祇园 卐 发表于: 2022-12-01   最后更新时间: 2022-12-02 00:26:35   1,854 游览

kafka集群重启后,zk 可以看到leader/follower节点,但在一个follower节点上用消费者脚本接收消息,提示无法与leader建立连接

kafka-console-consumer.sh --bootstrap-server 10.168.45.21:9092 --topic k

以下是终端消费者报错信息

Connection to node -2 (/10.168.45.21:9092) couldd not) could not be established. Broker may) could not be established. Broker may) could not be established. Broker may not be available. (org.apache not established. Broker may not be available. (org.apache.kafkad not) could not be established. Broker may) could not be established. Broker may not be available. (org.apache.kafka.clientsshed. Broker) could not be established. Broker may not be available. (orgd not) could not be established. Broker may) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

注:三个节点用zkServer.sh status可以查看其角色follower/leader/follower

[2022-12-01 15:17:58,375] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.zk.KafkaZkClient)
[2022-12-01 15:17:58,397] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '144199714271002624' does not match current session '72069007191048192' (kafka.zk.KafkaZkClient$CheckedEphemeral)
[2022-12-01 15:17:58,404] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:319)
    at kafka.Kafka$.main(Kafka.scala:109)
    at kafka.Kafka.main(Kafka.scala)
[2022-12-01 15:17:58,407] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
[2022-12-01 15:17:58,407] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
[2022-12-01 15:17:58,411] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
[2022-12-01 15:17:58,412] INFO [ReplicaManager broker=1] Shutting down (kafka.server.ReplicaManager)
[2022-12-01 15:17:58,414] INFO [LogDirFailureHandler]: Shutting down (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [LogDirFailureHandler]: Shutdown completed (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [LogDirFailureHandler]: Stopped (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-12-01 15:17:58,415] INFO [ReplicaFetcherManager on broker 1] shutting down (kafka.server.ReplicaFetcherManager)
[2022-12-01 15:17:58,417] INFO [ReplicaFetcherManager on broker 1] shutdown completed (kafka.server.ReplicaFetcherManager)
[2022-12-01 15:17:58,417] INFO [ReplicaAlterLogDirsManager on broker 1] shutting down (kafka.server.ReplicaAlterLogDirsManager)
[2022-12-01 15:17:58,418] INFO [ReplicaAlterLogDirsManager on broker 1] shutdown completed (kafka.server.ReplicaAlterLogDirsManager)
[2022-12-01 15:17:58,418] INFO [ExpirationReaper-1-Fetch]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,474] INFO [ExpirationReaper-1-Fetch]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,474] INFO [ExpirationReaper-1-Fetch]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,475] INFO [ExpirationReaper-1-Produce]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,674] INFO [ExpirationReaper-1-Produce]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,674] INFO [ExpirationReaper-1-Produce]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-DeleteRecords]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,675] INFO [ExpirationReaper-1-ElectLeader]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-01 15:17:58,681] INFO [ReplicaManager broker=1] Shut down completely (kafka.server.ReplicaManager)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,682] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO Broker to controller channel manager for alterIsr shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutting down (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Stopped (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,687] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
[2022-12-01 15:17:58,688] INFO Broker to controller channel manager for forwarding shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
[2022-12-01 15:17:58,688] INFO Shutting down. (kafka.log.LogManager)
[2022-12-01 15:17:58,710] INFO Shutdown complete. (kafka.log.LogManager)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Shutting down (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Stopped (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [feature-zk-node-event-process-thread]: Shutdown completed (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-12-01 15:17:58,712] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2022-12-01 15:17:58,816] INFO Session: 0x1000a6154f00000 closed (org.apache.zookeeper.ZooKeeper)
[2022-12-01 15:17:58,816] INFO EventThread shut down for session: 0x1000a6154f00000 (org.apache.zookeeper.ClientCnxn)
[2022-12-01 15:17:58,817] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2022-12-01 15:17:58,817] INFO [ThrottledChannelReaper-Fetch]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Fetch]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Fetch]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:17:59,449] INFO [ThrottledChannelReaper-Produce]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Produce]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Produce]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,449] INFO [ThrottledChannelReaper-Request]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,450] INFO [ThrottledChannelReaper-ControllerMutation]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,451] INFO [ThrottledChannelReaper-ControllerMutation]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,451] INFO [ThrottledChannelReaper-ControllerMutation]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-12-01 15:18:00,452] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutting down socket server (kafka.network.SocketServer)
[2022-12-01 15:18:00,477] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutdown completed (kafka.network.SocketServer)
[2022-12-01 15:18:00,477] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,477] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,477] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-12-01 15:18:00,479] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-12-01 15:18:00,482] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-12-01 15:18:00,482] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)
[2022-12-01 15:18:00,482] ERROR Exiting Kafka. (kafka.Kafka$)
[2022-12-01 15:18:00,483] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
发表于 2022-12-01
添加评论

[2022-12-01 15:17:58,397] ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner '144199714271002624' does not match current session '72069007191048192' (kafka.zk.KafkaZkClient$CheckedEphemeral)

kafka没启来,/brokers/ids/1在zk中已经存在了,导致的。

为何你的zk数据没有被及时释放?

我的run.sh里对于start部分,依次启动3台zk,然后依次启动3台kafka; 对于stop部分,依次停止3台kafka, 然后sleep 12, 最后依次停止3台zookeeper.

启动zk形如:

/opt/local/kafka/zookeeper/apache-zookeeper-3.6.3-bin/bin/zkServer.sh start

启动kafka形如:

/opt/local/kafka/kafka_2.13-3.0.0/bin/kafka-server-start.sh -daemon /opt/local/kafka/kafka_2.13-3.0.0/config/server.properties

停止kafka形如:

/opt/local/kafka/kafka_2.13-3.0.0/bin/kafka-server-stop.sh

停止zk形如:

/opt/local/kafka/zookeeper/apache-zookeeper-3.6.3-bin/bin/zkServer.sh stop

这样的脚本的是否有问题呢

1、停止kafka的时候,休眠太简单了,判断下kafka进程是确认没了,感觉你的kafka没停掉。
2、清理zk的脏数据。

一般来说退出kafka和zk的正确姿势是怎样的?需要在run.sh脚本中每执行一个停止kafka命令,就判断kafka进程有木有?另外,就是zk的脏数据是如何产生的呢,应该怎么清理比较好~
非常感谢!

kafka有时候确实不生效(没解释原因),可以使用kill

kill掉kafka,例如:

> ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
> kill -9 7564

在Windows上使用:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

zk的停止没问题。

嗯嗯,所以比较可靠的方法是先停止kafka集群,等待集群中所有kafka实例退出后,再退出zk集群,这样就不会有上面我说的问题了吧?

zk里存储的都是临时数据,kafka关闭之后,就会释放的。
我是怀疑kafka还没有真正的关闭,导致的你的问题。

有这个可能,非常感谢~

你的答案

查看kafka相关的其他问题或提一个您自己的问题