停掉消费者和生产者清空Kafka数据和zookeeper数据,双副本模式下18分区的数据多次会出现数据情况1的错误,查看状态数据如模式2,但是在三副本模式下重复上述操作则没有这种情况,而且看集群副本状态也正常。
双副本模式下看到Kafka的server.log的日志,有如数据情况3的报错
情况1
PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
18 4294925441 4266388068 -28537373
18 500 4266387988 4266387488
情况2
PartitionCount: 30 ReplicationFactor: 2 Configs:
Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0
Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 7 Leader: 1 Replicas: 1,2 Isr: 1
Partition: 8 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 9 Leader: 0 Replicas: 0,2 Isr: 0
Partition: 10 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 11 Leader: 2 Replicas: 2,1 Isr: 2,1
Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 13 Leader: 1 Replicas: 1,2 Isr: 1
Partition: 14 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 15 Leader: 0 Replicas: 0,2 Isr: 0
Partition: 16 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 17 Leader: 2 Replicas: 2,1 Isr: 2,1
Partition: 18 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 19 Leader: 1 Replicas: 1,2 Isr: 1
Partition: 20 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 21 Leader: 0 Replicas: 0,2 Isr: 0
Partition: 22 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 23 Leader: 2 Replicas: 2,1 Isr: 2,1
Partition: 24 Leader: 0 Replicas: 0,1 Isr: 0
Partition: 25 Leader: 1 Replicas: 1,2 Isr: 1
Partition: 26 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 27 Leader: 0 Replicas: 0,2 Isr: 0
Partition: 28 Leader: 1 Replicas: 1,0 Isr: 1
Partition: 29 Leader: 2 Replicas: 2,1 Isr: 2,1
情况3
[2021-11-23 18:17:53,925] ERROR [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=1] Unexpected error occurred while processing data for partition __consumer_offsets-46 at offset 5 (kafka.server.ReplicaFetcherThread)
kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append to __consumer_offsets-46: List(4272827160, 4271343398, 4271343399, 4271343400)
at kafka.log.Log.$anonfun$append$2(Log.scala:1161)
at kafka.log.Log.append(Log.scala:2404)
at kafka.log.Log.appendAsFollower(Log.scala:1071)
at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1034)
at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1041)
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:172)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:333)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:321)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:320)
at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at scala.collection.compat.MapExtensionMethods$.$anonfun$foreachEntry$1(PackageShared.scala:431)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:320)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:320)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:135)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:118)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)