kafka2.0版本,重启后会加载本地log文件,导致恢复时间长
具体日志:
[2020-05-14 17:44:59,993] INFO [ProducerStateManager partition=ttt-2] Loading producer state from snapshot file '/mnt/vdh/kafka-logs/ttt-2/00000000000001143024.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:44:59,994] INFO [ProducerStateManager partition=ttt-14] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-14/00000000000002552144.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,001] INFO [ProducerStateManager partition=ttt-0] Writing producer snapshot at offset 84541 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 84541 (kafka.log.Log)
[2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 84541 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=ttt-0] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-0/00000000000000084541.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=xxx-14] Writing producer snapshot at offset 79643 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 79643 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 79643 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,007] INFO [ProducerStateManager partition=xxx-11] Writing producer snapshot at offset 92353 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,007] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Recovering unflushed segment 92353 (kafka.log.Log)
[2020-05-14 17:45:00,008] INFO [ProducerStateManager partition=xxx-7] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/xxx-7/00000000000000529509.snapshot' (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,008] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Loading producer state till offset 92353 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [ProducerStateManager partition=xxx-5] Writing producer snapshot at offset 529393 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Recovering unflushed segment 529393 (kafka.log.Log)
[2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Loading producer state till offset 529393 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [ProducerStateManager partition=xxx-6] Writing producer snapshot at offset 87516 (kafka.log.ProducerStateManager)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 87516 (kafka.log.Log)
[2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 87516 with message format version 2 (kafka.log.Log)
[2020-05-14 17:45:00,012] INFO [ProducerStateManager partition=xxx-11] Loading producer state from snapshot file '/mnt/vdl/kafka-logs/xxx-11/00000000000000092353.snapshot' (kafka.log.ProducerStateManager)
查看源码看了一下是log里面的源码,是检查恢复
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
//此处判断是否是v2版本的消息格式
if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
(producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {
// To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
// truncation.
offsetsToSnapshot.flatten.foreach { offset =>
producerStateManager.updateMapEndOffset(offset)
producerStateManager.takeSnapshot()
}
} else {
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
//v2版本的消息格式,就会执行如下代码,truncateAndReload就重新加载log文件,导致几百G的数据记载完得很长时间
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
// Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
// offset (which would be the case on first startup) and there were active producers prior to truncation
// (which could be the case if truncating after initial loading). If there weren't, then truncating
// shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
// and we can skip the loading. This is an optimization for users which are not yet using
// idempotent/transactional features yet.
if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
producerStateManager.updateMapEndOffset(startOffset)
if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
是我哪块没配置么,导致的恢复慢。求大神指教啊
多久?kafka是基于存储的离线数据,不会在启动的时候先加载log的。
你这个属于启动后了,kafka已经正常工作了,然后kafka根据消费者的情况依次恢复消费,
topic多数据量也大,3个小时才能恢复,恢复过程isr一直缺失或者leader为-1。恢复指的是开始加载新数据,恢复之后isr才补全leader也恢复。
leader -1
,这是整个集群都宕了么?什么样的故障可以描述一下。
我现在也遇到了这个问题,同样是2.0版本,目前还在loading、writing操作,执行了5个小时了,还没有恢复完成,请问这个问题你们有好的办法吗?
你的答案