返回到文章

采纳

编辑于

kafka2.0版本,每次出故障后重启都会重新扫描日志文件,导致恢复很慢,是哪块没配置么?

kafka

kafka2.0版本,重启后会加载本地log文件,导致恢复时间长

具体日志:

[2020-05-14 17:44:59,993] INFO [ProducerStateManager partition=ttt-2] Loading producer state from snapshot file '/mnt/vdh/kafka-logs/ttt-2/00000000000001143024.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:44:59,994] INFO [ProducerStateManager partition=ttt-14] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-14/00000000000002552144.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,001] INFO [ProducerStateManager partition=ttt-0] Writing producer snapshot at offset 84541 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 84541 (kafka.log.Log)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 84541 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=ttt-0] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-0/00000000000000084541.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=xxx-14] Writing producer snapshot at offset 79643 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 79643 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 79643 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [ProducerStateManager partition=xxx-11] Writing producer snapshot at offset 92353 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Recovering unflushed segment 92353 (kafka.log.Log)
[2020-05-14 17:45:00,008] INFO [ProducerStateManager partition=xxx-7] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/xxx-7/00000000000000529509.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,008] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Loading producer state till offset 92353 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [ProducerStateManager partition=xxx-5] Writing producer snapshot at offset 529393 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Recovering unflushed segment 529393 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Loading producer state till offset 529393 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [ProducerStateManager partition=xxx-6] Writing producer snapshot at offset 87516 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 87516 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 87516 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,012] INFO [ProducerStateManager partition=xxx-11] Loading producer state from snapshot file '/mnt/vdl/kafka-logs/xxx-11/00000000000000092353.snapshot' (kafka.log.ProducerStateManager)

查看源码看了一下是log里面的源码,是检查恢复

// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
//此处判断是否是v2版本的消息格式
if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
    (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {
  // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
  // last two segments and the last offset. This should avoid the full scan in the case that the log needs
  // truncation.
  offsetsToSnapshot.flatten.foreach { offset =>
    producerStateManager.updateMapEndOffset(offset)
    producerStateManager.takeSnapshot()
  }
} else {
  val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
  //v2版本的消息格式,就会执行如下代码,truncateAndReload就重新加载log文件,导致几百G的数据记载完得很长时间
  producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

  // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
  // offset (which would be the case on first startup) and there were active producers prior to truncation
  // (which could be the case if truncating after initial loading). If there weren't, then truncating
  // shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
  // and we can skip the loading. This is an optimization for users which are not yet using
  // idempotent/transactional features yet.
  if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
    logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
      val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
      producerStateManager.updateMapEndOffset(startOffset)

      if (offsetsToSnapshot.contains(Some(segment.baseOffset)))

是我哪块没配置么,导致的恢复慢。求大神指教啊