从老版本升级kafka

原创
半兽人 发表于: 2017-01-16   最后更新时间: 2018-05-15 20:18:42  
{{totalSubscript}} 订阅, 41,055 游览

从老版本升级kafka

从0.8.x, 0.9.x 或 0.10.0.X 升级到 0.10.1.0

0.10.1.0有线协议更改,通过遵循以下建议的滚动升级,在升级期间不会停机。但是,需要注意升0.10.1.0中潜在的突发状况。

注意:由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端)

滚动升级:
  1. 更新所有broker的server.properties文件,并添加以下属性:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (如:0.8.2.0, 0.9.0.0或0.10.0.0).
    • log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
  2. 每次升级一个broker:关闭broker,替换新版本,然后重新启动它。
  3. 一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.10.1.0来转换所有协议。
  4. 如果之前的消息格式是0.10.0,则将log.message.format.version更改为0.10.1(这无影响,因为0.10.0和0.10.1的消息格式是相同的)。如果之前的消息格式版本低于.10.0,还不能更改log.message.format.version - 一旦所有的消费者都已升级到 0.10.0.0 或更高版本时,才能更改此参数。
  5. 逐个重新启动broker,使新协议版本生效。
  6. 如果log.message.format.version低于0.10.0,请等待,知道所有消费者升级到0.10.0或更新的版本,然后将每个broker的log.message.format.version更改为0.10.1。然后逐个重启。

注意:如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,它们将默认从新版本开始。

注意:变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。

在0.10.1.0中潜在的变化

  • 日志保留时间不再基于日志段的最后修改时间。相反,它将基于日志段中消息的最大时间戳。

  • 日志滚动时间不再取决于日志段的创建时间。而是基于消息中的时间戳。进一步来说。如果日志段中第一个消息的时间戳是T,则当新的消息的时间戳大于或等于T+log.roll.ms时,日志将推出。

  • 0.10.0 的打开的文件处理将增加了约33%,因为为每个段增加时间索引文件。

  • 时间索引和offset索引共享相同的索引大小配置。因为每个时间索引条目是offset索引条目的1.5备。用户可能需要增加log.index.size.max.bytes以避免频繁的日志滚动。

  • 由于索引文件数量增加,对于一些有大量日志段的broker(即 >15k),在broker启动期间,日志加载处理可能更长。根据我们的实现,num.recovery.threads.per.data.dir设置为1可减少日志加载的时间。

0.10.1.0显著的变化

  • 新的java消费者不再是测试阶段了,我们建议将其应用到所有的新开发当中。旧的Scala使用仍然支持,但将在下一个版本中弃用,并在未来的主要版本中移除。

  • --new-consumer/--new.consumer转换不再需要使用MirrorMaker和类似于Console消费者工具。只需要通过一个Kafka broker连接,而不是ZooKeeper了。另外,控制台消费者和旧消费者已弃用,并且将在未来的主要版本中移除。

  • Kafka集群现在可通过集群ID来标识唯一,broker升级到0.10.1.0时将自动的生成。集群ID可通过kafka.server:type=KafkaServer,name=ClusterId获取。它是元数据相应的一部分,序列化,客户端拦截器和度量记录器可通过实现ClusterResourceListener接口来接收集群ID。

  • BrokerState "RunningAsController" (value 4) 已被移除。由于一个bug,brpker仅在转换出来之前处于这种状态,因此移除影响应该是最小的。推荐的方法是通过kafka检查给定的broker是否是控制器。controller:type=KafkaController,name=ActiveControllerCount

  • 新的Java消费者现在允许用户通过分区上的时间戳来搜索offset。

  • 新的Java消费者现在支持后台线程心跳检测,有一个新的配置max.poll.interval.ms控制消费者主动离开组之前poll调用之间的最大时间(默认是5分钟)。配置request.timeout.ms的值必须始终大于max.poll.interval.ms,因为JoinGroup请求在消费者重新平衡时候阻塞服务器的最大时间。因此我们更改了其默认值超过5分钟,最后,session.timeout.ms的默认值已调整为10秒,并max.poll.records的默认值更改为500。

  • 当使用Authorizer并且用户对topic没有描述授权时,broker将不再向请求返回TOPIC_AUTHORIZATION_FAILED错误,因为这会泄漏topic名称。 相反,将返回UNKNOWN_TOPIC_OR_PARTITION错误代码。 当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的topic错误时自动重试。 如果您怀疑这可能已经正在发生,你应该查阅客户端日志。

  • 获取响应的默认的限制大小(消费者为50MB,副本为10MB)。现有的分区限制也适用(消费者和副本是1MB)。注意,这些限制不是绝对的最大值(下一节解释)。

  • 如果一个消息大于响应/分区大小限制,消费者和副本可以继续使用。更具体的是,如果在第一个非空分区中的第一个消息大于限制,则消息将仍然返回。

  • kafka.api.FetchRequest和kafka.javaapi.FetchRequest中增加了重载的构造函数。以允许调用者去指定分区的顺序(因为在v3中顺序很重要)。之前的构造函数已弃用。在请求发送之前,以避免资源匮乏问题引起的混洗。

新协议版本

  • ListOffsetRequest v1支持基于时间戳的精确offset搜索。

  • MetadataResponse v2引入了一个新字段:“cluster_id”。

  • FetchRequest v3支持限制响应大小(除了现有的分区限制)。

  • JoinGroup v1引入了一个新字段:“rebalance_timeout”。

从0.8.x 或 0.9.x 升级到 0.10.0.0

0.10.0.0具有潜在的突变更改(请在升级之前查看),以及升级后可能的性能影响。 通过遵循以下建议的滚动升级计划,可保障在升级期间和之后不会出现停机时间和性能影响。
注意:由于引入了新协议,因此在升级客户端之前先升级Kafka集群。

注意,对于版本0.9.0.0:由于0.9.0.0中有一个bug,依赖于Zookeeper(旧的Scala高级消费者和MirrorMaker如果一起使用)的客户端将无法在0.10.0.x中使用。因此,broker升级到0.10.0.x之前,先升级0.9.0.0客户端到0.9.0.1。对于0.8.X或0.9.0.1客户端,此步骤不是必需的。

滚动升级:
  1. 更新所有broker的server.properties文件,并添加以下配置:

    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如:0.8.2 或 0.9.0.0).
    • log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
  2. 升级broker,关闭它,然后升级到新版本,最后重启它。

  3. 一旦整个集群升级完成,通过编辑inter.broker.protocol.version设置为0.10.0.0转换所有协议。注意:你现在应该还不需要设置message.format.version - 此配置应该当所有的消费者升级为0.10.0.0时才需要设置。

  4. 依次重新启动broker,使新协议版本生效。

  5. 一旦所有的消费者已经升级为.10.0,设置每个broker的log.message.format.version为0.10.0,然后逐个重启。

注意 :如果你接受停机目,你可以简单粗暴的关闭所有broker,更新版本并重新启动。它们默认从新协议开始。

注意 :变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。

升级到0.10.0.0后潜在的性能影响

0.10.0中的消息格式包括新的时间戳字段,并使用压缩消息的相关联的offset。磁盘默认的消息格式是0.10.0,消息格式可以通过server.properties中的log.message.format.version配置。如果消费者客户端版本低于0.10.0.0。它只能“理解”0.10.0之前的消息格式。在这种情况下,broker在发送响应到旧版本消费者之前转换0.10.0格式到之前的格式。然而,这样的话,broker不是零复制传输。在Kafka社区关于性能影响的报告显示,在升级后,CPU利用率从20%提高100%。这迫使所有客户端马升级,促使性能恢复正常。为了避免消费者升级到0.10.0.0之前的消息转换,可以设置log.message.format.version为0.8.2或0.9.0。这样,broker仍然零复制传输将数据发送给旧的消费者。一旦消费者升级,就可以把消息格式更为0.10.0,就可以享受含新时间戳和优化后的压缩新消息格式。转换只是为了确保兼容性,尽可能避免消息转换才是至关重要的。

客户端升级到0.10.0.0,不会对性能产生影响。

注意 :通过设置消息格式版本,可以证明所有现有消息处于或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断消费者。

注意 :由于在每个消息中引入了额外的时间戳,生产者在发送少量消息可能会看到消息吞吐量下降(因为增加了开销)。 同样,复制每个消息传输也增加了8个字节。 如果你集群的能力与网络接近,可能会超过网卡,并看到由于过载的故障和性能问题。

注意 :如果生产者已经启用了压缩,则在某些情况下,可能注意到生产者吞吐量减少或broker的压缩率降低。当接收压缩消息时,0.10.0的broker避免再次压缩消息,这样减少延迟并提高吞吐量。然而,在某些情况下,这可能减少生产者的批次大小,导致较差的吞吐量。如果出现这种情况,可调整生产者的linger.ms 和 batch.size以提高吞吐量。另外,生产者用于压缩消息的缓存小于broker生产者使用的缓存,这可能对磁盘上的消息的压缩比有负面影响。 我们打算在未来的Kafka版本中进行配置。

0.10.0.0潜在的中断

  • 从Kafka 0.10.0.0开始,Kafka中的消息格式版本表示为Kafka的版本。例如,消息格式0.9.0指的是支持的最高消息版本就是0.9.0。

  • 消息格式0.10.0已经介绍过了,并且默认是使用的。消息包含了一个时间戳字段和压缩后消息的关系offset。

  • 已经引入了ProduceRequest/Response v2,并默认使用支持消息格式0.10.0。

  • 已经引入了FetchRequest/Response v2已经被引入,它默认使用支持消息格式0.10.0。

  • MessageFormatter 接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) 更改为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)

  • MessageReader 接口从 def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] 更改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]

  • MessageFormatter的包从kafka.tools到kafka.common

  • MessageReader的包从kafka.tools到kafka.common

  • MirrorMakerMessageHandler不再处理(记录:MessageAndMetadata [Array [Byte],Array [Byte]])方法从未被调用用。

  • 0.7版本的KafkaMigrationTool不再和kafka一起打包。如果你需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照的升级步骤从0.8升级到0.10.0。

  • 新消费者API已标准化,接收java.util.Collection作为方法参数的序列化类型。升级现有的版本才能使用0.10.0客户端库

  • LZ4压缩消息处理已更改为使用可互操作的规范框架(LZ4f v1.5.1)。为了保留与旧客户端的兼容性,此改变仅适用于消息格式为0.10.0和更高版本。使用v0/v1(消息格式0.9.0)Produce/Fetch LZ4压缩消息的客户端应继续使用0.9.0实现框架。使用Produce/Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。可互操作的LZ4库的列表可在https://www.lz4.org/查看

在0.10.0.0的显著变化

  • 从0.10.0.0开始,增加一个新的客户端Kafka Streams客户端,用于流式处理存储在kafka topic的数据。这个新客户端仅支持0.10.x或更高的版本。

  • 新消费者默认receive.buffer.bytes是64K。

  • 新的消费者现在公开了exclude.internal.topics配置,以防止内部topic(例如消费者offset topic)被其他的正则匹配订阅。默认是启用。

  • 旧的的Scala的生产者已经弃用。使用者尽快使用最新的Java客户端。

新的消费者API已标记为稳定。

从0.8.0, 0.8.1.X或0.8.2.X升级到0.9.0.0

9.0.0有潜在的中断更改风险(在升级之前需要知道),并且与之前版本的broker之间的协议改变。这意味着此次升级可能和客户端旧版本不兼容。因此在升级客户端之前,先升级kafka集群。如果你使用MirrorMaker下游集群,则同样应首先升级。

滚动升级
  1. 升级所有broker的server.properties,并在其中添加inter.broker.protocol.version = 0.8.2.X

  2. 每次升级一个broker:关闭broker,替换新版本,然后重新启动。

  3. 一旦整个群集升级,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0来转换所有协议。

  4. 逐个重新启动broker,使新协议版本生效。

注意 :如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,协议将默认从新版本开始。

注意 :变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。

0.9.0.0潜在的中断变化

  • Java 1.6不再支持。

  • Scala 2.9不再支持。

  • 默认情况下,1000以上的Broker ID为自动分配。如果你的集群高于该阈值,需相应地增加reserved.broker.max.id配置。

  • replica.lag.max.messages配置已经移除。分区leader在决定哪些副本处于同步时将不再考虑落后的消息的数。

  • 配置参数replica.lag.time.max.ms现在不仅指自上次从副本获取请求后经过的时间,还指自副本上次被捕获以来的时间。 副本仍然从leader获取消息,但超过replica.lag.time.max.ms配置的最新消息将被认为不同步的。

  • 压缩的topic不再接受没有key的消息,如果出现,生产者将抛出异常。 在0.8.x中,没有key的消息将导致日志压缩线程退出(并停止所有压缩的topic)。

  • MirrorMaker不再支持多个目标集群。 它只接受一个--consumer.config。 要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的消费者配置。

  • 在org.apache.kafka.clients.tools。包下的Tools已移至org.apache.kafka.tools。。 所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。

  • 在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。

  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。

  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在topic名称由于使用“.”或“_”而导致风险度量标准冲突时打印警告。以及冲突的情况下的错误。

  • kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用新的Java Producer,用户必须指定“old-producer”才能使用旧生产者。

  • 默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。

0.9.0.1中的显著变化

  • 可以通过将broker.id.generation.enable设置为false来禁用新的broker ID生成功能。

  • 默认情况下,配置参数log.cleaner.enable为true。 这意味着topic会清理。

  • policy = compact现在将被默认压缩,并且128MB的堆(通过log.cleaner.dedupe.buffer.size)分配给清洗进程。你可能需要根据你对压缩topic的使用情况,查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。

  • 默认情况下,新消费者的配置参数fetch.min.bytes的默认值为1。

0.9.0.0弃用的

  • kafka-topics.sh脚本的变更topic配置已弃用(kafka.admin.ConfigCommand),以后将使用kafka-configs.sh(kafka.admin.ConfigCommand) 。

  • kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已弃用,以后将使用kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand)

  • kafka.tools.ProducerPerformance已弃用。以后将使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将使用新类)

  • 生产者的block.on.buffer.full已弃用,并将在以后的版本中移除。目前其默认已经更为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms来中止,之后将抛出TimeoutException。如果block.on.buffer.full属性明确地设置为true,它将设置max.block.ms为Long.MAX_VALUE和metadata.fetch.timeout.ms将不执行。

从0.8.1升级到0.8.2

0.8.2与0.8.1完全兼容。 关闭,更新代码并重新启动,逐个升级broker。

从0.8.0升级到0.8.1

0.8.1与0.8完全兼容。 关闭,更新代码并重新启动,逐个升级broker。

从0.7升级

版本0.7与较新版本不兼容。 对API,ZooKeeper数据结构,协议和配置进行了主要更改,以便添加复制(在0.7中缺失)。 从0.7版升级到更高版本需要一个特殊的迁移工具(通过下一章的API)。 此迁移可以在不停机的情况下完成。

更新于 2018-05-15
在线,1小时前登录

Bear 7年前

请问升级是替换掉哪些文件呢,对数据会有影响吗?

半兽人 -> Bear 7年前

是整个替换,重新修改server.config

Jerry 7年前

从0.8.0, 0.8.1.X或0.8.2.X升级到0.9.0.0



这个升级步骤不是很明白啊?求指教。第二步的替换新版本怎么替换呢。

半兽人 -> Jerry 7年前

替换新版本,不就是下个新版的替换旧版本的么。

Jerry -> 半兽人 7年前

就是在新版的配置文件中添加inter.broker.protocol.version = 0.8.2.X对吧

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章