kafka2.0版本,每次出故障后重启都会重新扫描日志文件,导致恢复很慢,是哪块没配置么?

主宰思想 发表于: 2020-05-14   最后更新时间: 2022-02-12 13:10:55   3,710 游览

kafka2.0版本,重启后会加载本地log文件,导致恢复时间长

具体日志:

[2020-05-14 17:44:59,993] INFO [ProducerStateManager partition=ttt-2] Loading producer state from snapshot file '/mnt/vdh/kafka-logs/ttt-2/00000000000001143024.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:44:59,994] INFO [ProducerStateManager partition=ttt-14] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-14/00000000000002552144.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,001] INFO [ProducerStateManager partition=ttt-0] Writing producer snapshot at offset 84541 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 84541 (kafka.log.Log)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 84541 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=ttt-0] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-0/00000000000000084541.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=xxx-14] Writing producer snapshot at offset 79643 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 79643 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 79643 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [ProducerStateManager partition=xxx-11] Writing producer snapshot at offset 92353 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Recovering unflushed segment 92353 (kafka.log.Log)
[2020-05-14 17:45:00,008] INFO [ProducerStateManager partition=xxx-7] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/xxx-7/00000000000000529509.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,008] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Loading producer state till offset 92353 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [ProducerStateManager partition=xxx-5] Writing producer snapshot at offset 529393 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Recovering unflushed segment 529393 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Loading producer state till offset 529393 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [ProducerStateManager partition=xxx-6] Writing producer snapshot at offset 87516 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 87516 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 87516 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,012] INFO [ProducerStateManager partition=xxx-11] Loading producer state from snapshot file '/mnt/vdl/kafka-logs/xxx-11/00000000000000092353.snapshot' (kafka.log.ProducerStateManager)

查看源码看了一下是log里面的源码,是检查恢复

// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
//此处判断是否是v2版本的消息格式
if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
    (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {
  // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
  // last two segments and the last offset. This should avoid the full scan in the case that the log needs
  // truncation.
  offsetsToSnapshot.flatten.foreach { offset =>
    producerStateManager.updateMapEndOffset(offset)
    producerStateManager.takeSnapshot()
  }
} else {
  val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
  //v2版本的消息格式,就会执行如下代码,truncateAndReload就重新加载log文件,导致几百G的数据记载完得很长时间
  producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

  // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
  // offset (which would be the case on first startup) and there were active producers prior to truncation
  // (which could be the case if truncating after initial loading). If there weren't, then truncating
  // shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
  // and we can skip the loading. This is an optimization for users which are not yet using
  // idempotent/transactional features yet.
  if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
    logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
      val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
      producerStateManager.updateMapEndOffset(startOffset)

      if (offsetsToSnapshot.contains(Some(segment.baseOffset)))

是我哪块没配置么,导致的恢复慢。求大神指教啊

发表于 2020-05-14
添加评论

多久?kafka是基于存储的离线数据,不会在启动的时候先加载log的。
你这个属于启动后了,kafka已经正常工作了,然后kafka根据消费者的情况依次恢复消费,

主宰思想 -> 半兽人 4年前

topic多数据量也大,3个小时才能恢复,恢复过程isr一直缺失或者leader为-1。恢复指的是开始加载新数据,恢复之后isr才补全leader也恢复。

半兽人 -> 主宰思想 4年前

leader -1,这是整个集群都宕了么?
什么样的故障可以描述一下。

我现在也遇到了这个问题,同样是2.0版本,目前还在loading、writing操作,执行了5个小时了,还没有恢复完成,请问这个问题你们有好的办法吗?

你的答案

查看kafka相关的其他问题或提一个您自己的问题