整理kafka相关的常用命令。
管理
## 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
### kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
## 分区扩容
### kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
### kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
## 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
查询
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
## 查询集群描述(新)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 消费者列表查询(存储在zk中的)
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
## 消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
## 消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
发送和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
## 生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
## 消费者(最新)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties
## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
切换leader
# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
kafka自带压测命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
kafka持续发送消息
持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:
kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000
zookeeper-shell.sh
如果kafka集群的zk配置了chroot路径,那么需要加上/path
。
bin/zookeeper-shell.sh localhost:2181[/path]
ls /brokers/ids
get /brokers/ids/0
重置消费者offset
例如,要将消费者组的offset重置为最新的offset:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
详情参考:https://www.orchome.com/35
迁移分区
创建规则json
cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}] } EOF
执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
大神你好,我想问一下,我用kafka自带压测命令,结果给我报这个错,是为什么呢,我的生产消费都没有问题
2种情况
1、压测过大,有些消息发送排队超过了
60000 ms
。2、跟压测无关,如果你timeout一开始就是失败的,那说明你当下发送指令的这个集群请求地址本身就不通,
telnet
一下全部节点,看看网络。我全部测试过了,端口都是通的,请问问题出在哪里呢。
具体问题详情转到了:kafka自带压力测试报org.apache.kafka.common.errors.TimeoutException
我还是下午问你的那个小白,扳手哥
你让我看那个迁移分区对应的消费组lag情况,我想了下
可是我迁移的这个分区,ISR 中已经出现了我的目标地址(broker_id)了,我个人觉得这个迁移的消费者组id对应的lag已经为0了(我也找不到。。这个消费者组id。。。)
我看了源码的注释
这是分区迁移的流程
我认为我现在已经到了第八步,在第九步或者第10步卡住了
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK * may go through the following transition. * AR leader/isr * {1,2,3} 1/{1,2,3} (initial state) * {1,2,3,4,5,6} 1/{1,2,3} (step 2) * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) * {1,2,3,4,5,6} 4/{4,5,6} (step 8) * {4,5,6} 4/{4,5,6} (step 10) *
在此目录下运行命令
/opt/kafka_2.12-2.2.0/
1.启动内置的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2.启动kafka服务
bin/kafka-server-start.sh config/server.properties
3.创建topic话题
bin/kafka-topics.sh --create --topic test --bootstrap-server 192.168.218.128:9092 -partitions 3 -replication-factor 1
查看所有topic话题
bin/kafka-topics.sh --list --bootstrap-server 192.168.218.128:9092
查看指定话题的详情
bin/kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --describe --topic test
3.创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.218.128:9092 --topic test
4.创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.218.128:9092 --topic test
这个是最基本的吗?
是的。
博主,您好:
想请教个kafka副本扩容问题:(2个broker,2个分区,1个副本)
今天创建了一个topic,指定了2个分区,1个副本,后来想把副 本修改为2个,按照操作步骤执行:
1、创建json文件
{ "partitions": [ { "topic":"queue", "partition": 0, "replicas": [1,2] }, { "topic": "queue", "partition": 1, "replicas": [2,1] } ], "version":1 }
2、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --execute
3、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --verify
以上操作,终端都没提示错误,都正常执行
但是当我停掉其中一台kafka后,消费者就消费不到数据了,这种情况我不知道如何排查了,希望博主能帮我看看
__consumer_offsets
这个消费者主题也要扩成2个副本。您的意思是保存偏移量的这个topic也要2副本,是吧? 我刚刚看了下,我的两个broker里,一个保存的是奇数的分区,一个保存的是偶数的分区。
那我现在要对偏移量进行增加副本操作了?
我试着已经将__consumer_offsets 扩成2个副本了, 第2台停掉后,能正常消费。但是第1台停掉后,就不正常消费了。
错误是这个:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition
有一个奇怪的现象是: 我刚刚扩容的这个__consumer_offsets 副本,leader显示都是为1
到问题专题里面把你的主题状态贴一下给我看看吧,咱们换个地方。
您好:https://www.orchome.com/1912
还有大大 问一下 我用java写producer的时候 比如有多个topic 但是只有其中一个topic能用 其他都会报错并且会在linux下产生多个sonsumer-console进程 然后consumer 直接不能用
打错 是console-consumer 进程
没看懂额。。
./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server localhost:9092 Exception in thread "main" joptsimple.UnrecognizedOptionException: new-consumer is not a recognized option at joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at joptsimple.OptionParser.parse(OptionParser.java:396) at kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.<init>(ConsumerGroupCommand.scala:725) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:42) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
你什么版本,把
--new-consumer
去掉试试。bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
执行此命令,有报错:new-consumer is not a recognized option
您好,有没有用Java API创建,查找topic的文章?
topic你要通过api创建,还是查询。如果是查询可以参考KafaOffsetMonitor源码。
做的是一个先查询,如果没有这个topic,则去创建topic。
我用网上查找的TopicCommand.main(options);报错运行不出来。。。
会自动创建,为啥要自己创建呢?
啊?用send命令时候,producer.send(topic, key, value),如果kafka没有这个topic的时候,会自动创建吗?
对呀。
奥奥,非常感谢。。。学到了学到了,谢谢大佬。。。。
大佬有公众号什么的吗?
https://www.orchome.com/kafka/index
首页文章中有。
嗯嗯,好的,谢谢,辛苦您了。
我这kafka内存占用率下不来,什么情况,消息也过了失效的时间。
咦,我记得回答过你的问题。
kafka是基于jvm的,充分利用当前内存,当有别的进程启动的时候,会释放这部分内存。
.sh在命令台运行不了啊,不是只能运行.bat文件吗
跟你的系统有关。linux是sh,windowns是bat
对不起,我傻逼了