返回到文章

采纳

编辑于 3年前

Kafka节点迁移导致集群报错

kafka

线上有两套Kafka集群:

  • 集群A版本:0.10.0.1
  • 集群B版本:0.10.2.1

近期由于集群B资源紧张,故从集群A中下线了3台机器,准备添加至集群B中,下线步骤如下:

  1. 迁移该三台机器上的Topic副本至其他Broker上
  2. 分别在三台机器上运行bin/kafka-server-stop.sh命令,关闭Kafka服务
  3. 清理三台机器上的Kafka服务的安装目录、数据目录及日志目录。

下线后,集群A运行正常。

然后,把三台集群依次添加至集群B中,添加后,三台机器上的server.log中均有WARN日志如下:

[2021-10-15 11:12:12,140] INFO [Kafka Server 12], started (kafka.server.KafkaServer)
[2021-10-15 11:12:12,323] WARN Attempting to send response via channel for which there is no open connection, connection id 5 (kafka.network.Processor)
[2021-10-15 11:12:18,445] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:29,527] WARN Attempting to send response via channel for which there is no open connection, connection id 3 (kafka.network.Processor)
[2021-10-15 11:12:31,585] WARN Attempting to send response via channel for which there is no open connection, connection id 1 (kafka.network.Processor)
[2021-10-15 11:12:31,728] WARN Attempting to send response via channel for which there is no open connection, connection id 10 (kafka.network.Processor)
[2021-10-15 11:12:57,526] WARN Attempting to send response via channel for which there is no open connection, connection id 0 (kafka.network.Processor)

同时集群A中的各个Broker也开始报ERROR日志如下:

[2021-10-15 11:13:02,187] ERROR Closing socket for xxx:9092-xxxx:49691 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
        at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
        at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
        at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
        at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
        ... 10 more

请问是我Broker从集群A中下线的方式有误吗,为什么三台Broker下线之后没异常,添加至集群B中会导致集群A服务报错?

有没有哪位大神遇到过此类情况。