kafka在数据迁移期间限制带宽的使用

原创
半兽人 发表于: 2017-02-10   最后更新时间: 2018-01-01 23:43:35  
{{totalSubscript}} 订阅, 19,360 游览

Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。

有2个接口可以实现限制。最简单和最安全的是调用kafka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值。

例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过50MB/s。

$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000

当你运行这个脚本,你会看到这个限制:

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。

当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

管理员还可以使用kafka-configs.sh验证已分配的配置。有2对限制配置用于管理限流。而限制值本身,是个broker级别的配置,用于动态属性配置:

leader.replication.throttled.rate
  follower.replication.throttled.rate

此外,还有枚举集合的限流副本:

leader.replication.throttled.replicas
  follower.replication.throttled.replicas

其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。

查看限流配置:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。

要查看限流副本的列表:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

这里我们看到leader限制被应用到broker 102上的分区1和broker 101的分区0.同样,follower限制应用到broker 101的分区1和broker 102的分区0.

默认情况下,kafka-reassign-partitions.sh会将leader限制应用于重新平衡前存在的所有副本,任何一个副本都可能是leader。它将应用follower限制到所有移动目的地。因此,如果broker 101,102上有一个副本分区,被分配给102,103,则该分区的leader限制,将被应用到101,102,并且follower限制将仅被应用于103。

如果需要,你还可以使用kafka-configs.sh的--alter开关手动地更改限制配置。

安全的使用限制复制

在使用限制复制时应各位的小心,特别是:

(1) 限制移除:

一旦重新分配完成,限制应该及时的移除(通过运行kafka-reassign-partitions —verify移除)。

(2) 确保进展:

如果限制设置的太低,与传入的写入速率相比,复制可能无法进行:

max(BytesInPerSec) > throttle

其中BytesInPerSec是监控生产者写入到broker的吞吐量。

可以使用该命令监视重新平衡期间复制是否在进行,使用以下方式:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

在复制期间落后应不断地减少,如果没有缩小,则管理员通过上面介绍的方式增加限制的吞吐量。

设置配额

默认情况下,客户端的配额不受限制。可以为每个(user,client-id),user或client-id分组设置自定义的配额。

配置自定义的配额(user=user1,client-id=clientA):

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

user=user1配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.

client-id=clientA配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.

可以通过--entity-default为(user,client-id),userclient-id group设置默认的配额。
为user=userA配置默认client-id配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.

为user配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default
Updated config for entity: default user-principal.

为client-id配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default
Updated config for entity: default client-id.

为指定的(user,client-id)展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的user展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的client-id展示配额。

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

如果没有指定名称,则展示指定的类型的,查看所有user:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048

(user,client)也是一样:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

可以通过在broker中设置以下配置来设置适用于所有client-id的默认配额。仅当未在Zookeeper中配置配额覆盖或默认值时,才应用这些属性。 默认情况下,每个client-id接收一个无限制的配额。 以下将每个producerconsumer client-id的默认配额设置为10MB /秒。

quota.producer.default=10485760
quota.consumer.default=10485760

请注意,这些属性已被弃用,可能会在将来的版本中删除。 请优先使用kafka-configs.sh。

更新于 2018-01-01
在线,7分钟前登录

秦山木槿花 4年前

这种情况有啥解决办法吗,比如说我这个分区副本为1、2、3,然后我要迁移成1、2、4。
这个kafka单个分区(20G左右)迁移后,卡着不动了。
现在describe了看到现象为
replica:1、2、3、4,ISR:1、2、4
我能把replica中的3删掉吗(在zk的、broker/topics/xxx节点值中set一下)?

意思就是说迁移任务一直卡着in progress,但要更改的副本其实已经到了isr中了,只不过在replicas中还有原来的那个老id,能给它在zk中干掉吗
如果能干掉,干掉后需要重启kafka集群吗

迁移在进行中,你多等等吧,不是也有命令可以查看迁移进度么。
你改zk可能会导致整个集群错乱崩溃的。

啥命令看迁移进度啊?
我只知道verify。。。。然后就一直in progress了

等了2天了没反应

参考命令:https://www.orchome.com/454
有查询集群的命令,步骤:
1、查询消费者组列表。
2、拿到消费者组列表后,查看迁移消费者组详情。
3、看看lag延迟差距多少。

如果lag持续在减少,那继续等待吧,反之,持续新增的数据比同步的速度快,那你新建个分区,将消息推倒新分区上。

迁移消费者组id咋知道嘞,我用consumer.group.sh --list查了下,没找到

祖晓晖 -> 半兽人 3年前

kafka集群缩容分区迁移太慢:需要对kafka集群进行缩容,目前是9个broker,需要缩容到7个,提交执行了分区迁移计划。kafka版本2.11-1.1.0
问题:
1、7个小时了,还没执行完,请问这个是正常的吗
2、有什么办法可以加快分区迁移,能否通过删除持久化数据,或者删除并新建topic。(集群可以容忍数据丢失,存日志的)
3、查看topic详细信息,有的副本数多余isr,设置的是3副本,但是显示Replicas: 6,7,3,8,4 Isr: 6,8,7,3
4、通过主机监控,发现被迁移的主机,网卡流出量有200多MB,说明是在同步的吧。

小鱼儿 4年前

博主,有2个疑问

  1. kafka配额的主要作用是为了限流leader副本或follower副本之间的流控?如果我想控制整个集群的流控是不是为此集群中的每个broker配置users.id,再user配置配额即可?
  2. 如果为user=user1配置自定义的配额,是不是以意味着所有user1的producer和consumer的流控到不得超过1024,超过后会sleep,是这个意思吗?
半兽人 -> 小鱼儿 4年前

1、是的,只有副本需要同步leader。
2、客户端限流是分别设定producer和consumer的,例子里全是这种吧

一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。

你好,这个如果标记流量限制没有主动触发删除,根据上述文本的意思,是会阻塞broker之间的数据同步。这个我理解是不会的吧

是我想多了,可以忽略下

Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上线。。。
有2个接口可以实现限制。最简单和最安全的是调用afka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值

你好,这里有两处小错误:
1.限制了副本从机器到另一台机器的带宽上线(限)
2.最简单和最安全的是调用 afka-reassign-partitions.sh 时加限制(kafka-reassign-partitions.sh 原文少了个k)

顺带一句祝福,楼主新年快乐。

感谢指正,新年快乐~!

爱静致远 7年前
leader.replication.throttled.replicas
  follower.replication.throttled.replicas 楼主,这连个参数是什么意思?怎么使用呢?
半兽人 -> 爱静致远 7年前

限流leader副本或follower副本之间的流控

爱静致远 -> 半兽人 7年前

还是不太理解,能举个例子吗?

BIG BOMB!!!! 7年前

请问你使用的版本是什么版本的kafka?
我在增加配合使用 --entity-type users选项的时候报错:

Exception in thread "main" java.lang.IllegalArgumentException: --entity-type must be 'topics' or 'clients' at kafka.admin.ConfigCommand$ConfigCommandOptions.checkArgs(ConfigCommand.scala:192) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:46) 
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
半兽人 -> BIG BOMB!!!! 7年前

你的全命令是什么?

爱静致远 -> 半兽人 7年前

半兽人能帮我解决一个问题吗?leader.replication.throttled.replicas  我的QQ,522636854.

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章