kafka扩大集群

原创
半兽人 发表于: 2015-03-10   最后更新时间: 2017-04-12 09:15:42  
{{totalSubscript}} 订阅, 23,021 游览

Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers. However these new servers will not automatically be assigned any data partitions, so unless partitions are moved to them they won't be doing any work until new topics are created. So usually when you add machines to your cluster you will want to migrate some existing data to these machines.

增加新服务到kafka集群是很容易的,只要为新服务分配一个独一无二的Broker ID并启动即可。但是,新的服务不会自动分配到任何数据,需要把分区数据迁移给它们,在此期间它们一直不工作,直到新的topic创建,所以,通常向集群添加机器时,你需要将一些现有的数据迁移到这些机器上。


The process of migrating data is manually initiated but fully automated. Under the covers what happens is that Kafka will add the new server as a follower of the partition it is migrating and allow it to fully replicate the existing data in that partition. When the new server has fully replicated the contents of this partition and joined the in-sync replica one of the existing replicas will delete their partition's data.

迁移数据的过程是手动启动的,但是执行过程是完全自动化的。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移分区的follower,来完全复制该分区现有的数据。当新服务器完全复制该分区的内容并加入同步副本,成为现有副本之一后,就将现有的副本分区上的数据删除。


The partition reassignment tool can be used to move partitions across brokers. An ideal partition distribution would ensure even data load and partition sizes across all brokers. In 0.8.1, the partition reassignment tool does not have the capability to automatically study the data distribution in a Kafka cluster and move partitions around to attain an even load distribution. As such, the admin has to figure out which topics or partitions should be moved around.

分区重新分配工具可以用于跨broker迁移分区,理想的分区分配将确保所有的broker数据负载和分区大小。分区分配工具没有自动研究kafka集群的数据分布和迁移分区达到负载分布的能力,因此,管理员要弄清楚哪些topic或分区应该迁移。


The partition reassignment tool can run in 3 mutually exclusive modes -

分区分配工具的3种模式 -

  • --generate: In this mode, given a list of topics and a list of brokers, the tool generates a candidate reassignment to move all partitions of the specified topics to the new brokers. This option merely provides a convenient way to generate a partition reassignment plan given a list of topics and target brokers.
    --generate: 这个选项命令,是生成分配规则json文件的,生成“候选人”重新分配到指定的topic的所有parition都移动到新的broker。此选项,仅提供了一个方便的方式来生成特定的topic和目标broker列表的分区重新分配 “计划”。
  • --execute: In this mode, the tool kicks off the reassignment of partitions based on the user provided reassignment plan. (using the --reassignment-json-file option). This can either be a custom reassignment plan hand crafted by the admin or provided by using the --generate option
    --execute: 这个选项命令,是执行你用--generate 生成的分配规则json文件的,(用--reassignment-json-file 选项),可以是自定义的分配计划,也可以是由管理员或通过--generate选项生成的。
  • --verify: In this mode, the tool verifies the status of the reassignment for all partitions listed during the last --execute. The status can be either of successfully completed, failed or in progress
    --verify: 这个选项命令,是验证执行--execute重新分配后,列出所有分区的状态,状态可以是成功完成,失败或正在进行中的。

自动将数据迁移到新机器

The partition reassignment tool can be used to move some topics off of the current set of brokers to the newly added brokers. This is typically useful while expanding an existing cluster since it is easier to move entire topics to the new set of brokers, than moving one partition at a time. When used to do this, the user should provide a list of topics that should be moved to the new set of brokers and a target list of new brokers. The tool then evenly distributes all partitions for the given list of topics across the new set of brokers. During this move, the replication factor of the topic is kept constant. Effectively the replicas for all partitions for the input list of topics are moved from the old set of brokers to the newly added brokers.

使用分区重新分配工具将从当前的broker集的一些topic移到新添加的broker。同时扩大现有集群,因为这很容易将整个topic移动到新的broker,而不是每次移动一个parition,你要提供新的broker和新broker的目标列表的topic列表(就是刚才的生成的json文件)。然后工具将根据你提供的列表把topic的所有parition均匀地分布在所有的broker,topic的副本保持不变。


For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will only exist on brokers 5,6
例如,下面的例子将主题foo1,foo2的所有分区移动到新的broker 5,6。移动结束后,主题foo1和foo2所有的分区都会只会在broker 5,6。


注意:站长友情提示各位kafka学习者,下面所有的json文件,都是要你自己新建的,不是自动创建的,需要你自己把生成的规则复制到你新建的json文件里,然后执行。

Since, the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows-
执行迁移工具需要接收一个json文件,首先需要你确认topic的迁移计划并创建json文件,如下所示

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}

Once the json file is ready, use the partition reassignment tool to generate a candidate assignment-

一旦json准备好,使用分区重新分配工具生成一个“候选人”分配规则 -

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows-
生成从主题foo1,foo2迁移所有的分区到broker 5,6的候选人分配规则。注意,这个时候,迁移还没有开始,它只是告诉你当前分配和新的分配规则,当前分配规则用来回滚,新的分配规则保存在json文件(例如,我保存在 expand-cluster-reassignment.json这个文件下)然后,用--execute选项来执行它。

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option

最后,--verify 选项用来检查parition重新分配的状态,注意, expand-cluster-reassignment.json(与--execute选项使用的相同)和--verify选项一起使用。

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully 
Reassignment of partition [foo2,2] completed successfully 


自定义分区分配和迁移

The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. When used in this manner, it is assumed that the user knows the reassignment plan and does not require the tool to generate a candidate reassignment, effectively skipping the --generate step and moving straight to the --execute step
分区重新分配工具也可以有选择性将分区副本移动到指定的broker。当用这种方式,假定你已经知道了分区规则,不需要通过工具生成规则,可以跳过--generate,直接使用—execute


For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3
例如,下面的例子是移动主题foo1的分区0到brokers 5,6 和主题foo2的分区1到broker 2,3。


The first step is to hand craft the custom reassignment plan in a json file-

第一步是,手工写一个自定义的分配计划到json文件中 -

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

Then, use the json file with the --execute option to start the reassignment process-

然后,--execute 选项执行分配处理 -

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option

最后使用--verify 验证。

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully 
更新于 2017-04-12

大神好,
如果新加入的kafka节点还没有进行过数据迁移,此时在filebeat中已经配置了新节点的信息并已经运行,那么filebeat采集的数据能够进入新节点吗?
在kafka的官方文档中没有看到相关的说明,目前根据测试情况是filebeat采集的数据没有进行新节点。
请问是否有看到过官方对此的说明,或者是否有实际应用的经验可以分享一下。

必须要进行数据迁移,因为在新的节点上,你的topic是没有分区的。

多谢。
以下是我的理解:
当生产者向某个topic发送数据或者消费者向某个topic消费数据时,如果kafka集群中没有该topic,就会自动创建topic。如果集群中已经有了该topic,就不会再创建了。
在进行集群扩容时,由于集群中topic已经存在,所以新的节点上是不会再去创建相同的topic的,必须进行数据迁移。

对,当第一次创建topic时,topic的分区分配到每个kakfa现有的节点中(根据分区数)。一旦分配好了,就不会变动了。

kafka数据迁移的时候,如果生产环境下正在往某个topic生产数据,这个时候迁移这个topic的数据会不会存在数据丢失的问题

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章