我的kafka的topic有12个partition,所有partition都正常写入,但是有一个partition不能消费了,也不报错。但是使用kafka自带的console-consumer可以消费。 consumer是自动分配partition的,没有使用assign函数 添加新的consumer-group时,一直添加不了,找得到coordinator,也不报错,就是不能消费。
添加新group时consumer启动输出:
/home/dig/service/jdk/bin/java -Xms512m -Xmx2048m -Duser.timezone=GMT+08 -jar lib/logConsumer-2.0-SNAPSHOT.jar defaultKJLog 2020-01-16 at 11:34:51 GMT+08:00 INFO com.konka.kafkareciver.KafkaReciver 31 main - 初始化消费程序. 2020-01-16 at 11:34:51 GMT+08:00 INFO com.konka.util.PropertyUtil 20 getProperties - 加载配置文件:conf/common.properties 2020-01-16 at 11:34:51 GMT+08:00 INFO com.konka.kafkareciver.KafkaReciver 39 main - 订阅topic:[defaultKJLog] 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.common.config.AbstractConfig 180 logAll - ConsumerConfig values: auto.commit.interval.ms = 30000 auto.offset.reset = earliest bootstrap.servers = [40.73.86.89:9492, 40.73.86.89:9592, collect.kkapp.com:9692] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-7 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 3000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 180000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class com.konka.util.KafkaLogMessageDeSer 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.common.config.AbstractConfig 180 logAll - ConsumerConfig values: auto.commit.interval.ms = 30000 auto.offset.reset = earliest bootstrap.servers = [40.73.86.89:9492, 40.73.86.89:9592, collect.kkapp.com:9692] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-7 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 3000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 180000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS ▽ value.deserializer = class com.konka.util.KafkaLogMessageDeSer 2020-01-16 at 11:34:51 GMT+08:00 WARN org.apache.kafka.common.config.AbstractConfig 188 logUnused - The configuration 'batch.size' was supplied but isn't a known config. 2020-01-16 at 11:34:51 GMT+08:00 WARN org.apache.kafka.common.config.AbstractConfig 188 logUnused - The configuration 'topiclist' was supplied but isn't a known config. 2020-01-16 at 11:34:51 GMT+08:00 WARN org.apache.kafka.common.config.AbstractConfig 188 logUnused - The configuration 'linger.ms' was supplied but isn't a known config. 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo 83- Kafka version : 0.10.1.0 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo 84- Kafka commitId : 3402a74efb23d1d4 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler 555 onSuccess - Discovered coordinator collect01:9492 (id: 2147483647 rack: null) for group test-7. 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 333 onJoinPrepare - Revoking previously assigned partitions [] for group test-7 2020-01-16 at 11:34:51 GMT+08:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator 381 sendJoinGroupRequest - (Re-)joining group test-7 2020-01-16 at 11:34:52 GMT+08:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1 349 onSuccess - Successfully joined group test-7 with generation 4 2020-01-16 at 11:34:52 GMT+08:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 225 onJoinComplete - Setting newly assigned partitions [defaultKJLog-11, defaultKJLog-10, defaultKJLog-9, defaultKJLog-8, defaultKJLog-7, defaultKJLog-6, defaultKJLog-5, defaultKJLog-4, defaultKJLog-3, defaultKJLog-2, defaultKJLog-1, defaultKJLog-0] for group test-7
然后就一直卡 在这里
1、新的消费者组加入之后,topic里有持续的新消息吗?
2、查看下kafka集群是否有异常日志。
3、用以下命令查看下对应的状态,发出来看看。
## 查询集群描述 bin/kafka-topics.sh --describe --zookeeper ## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前) bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-7 ## 显示某个消费组的消费详情(0.10.1.0版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-7
新的消费者组没有加入成功,使用命令:kafka-consumer-groups.sh --bootstrap-server localhost:9492 --list 查看不到新的消费者组
还有个命令,那个topic的集群状态呢
我不明白这个命令能看出什么来。我把输出贴出来
[root@collect01 bin]# ./kafka-topics.sh --describe --zookeeper localhost:2481 Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 4 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 5 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 6 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 7 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 8 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 9 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 10 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 12 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 15 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 16 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 17 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 18 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 19 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 20 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 21 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 22 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 23 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 24 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 25 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 27 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 28 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 29 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 30 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 31 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 32 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 33 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 34 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 35 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 36 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 37 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 39 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 40 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 41 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 42 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 43 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 44 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 45 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 46 Leader: 1 Replicas: 1 Isr: 1 Topic: __consumer_offsets Partition: 47 Leader: 2 Replicas: 2 Isr: 2 Topic: __consumer_offsets Partition: 48 Leader: 0 Replicas: 0 Isr: 0 Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1 Topic:defaultKJLog PartitionCount:12 ReplicationFactor:2 Configs: Topic: defaultKJLog Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2,1 Topic: defaultKJLog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: defaultKJLog Partition: 2 Leader: 4 Replicas: 4,0 Isr: 0,4 Topic: defaultKJLog Partition: 3 Leader: 4 Replicas: 4,0 Isr: 4,0 Topic: defaultKJLog Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: defaultKJLog Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: defaultKJLog Partition: 6 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: defaultKJLog Partition: 7 Leader: 3 Replicas: 3,0 Isr: 0,3 Topic: defaultKJLog Partition: 8 Leader: 4 Replicas: 4,1 Isr: 1,4 Topic: defaultKJLog Partition: 9 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: defaultKJLog Partition: 10 Leader: 4 Replicas: 4,0 Isr: 0,4 Topic: defaultKJLog Partition: 11 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic:realTimeLog PartitionCount:6 ReplicationFactor:3 Configs: Topic: realTimeLog Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0 Topic: realTimeLog Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: realTimeLog Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: realTimeLog Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: realTimeLog Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: realTimeLog Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic:test2 PartitionCount:6 ReplicationFactor:3 Configs: Topic: test2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: test2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test2 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: test2 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: test2 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: test2 Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
不被消费的是Topic: defaultKJLog Partition: 7 Leader: 3 Replicas: 3,0 Isr: 0,3
嗯,我前两天使用这个命令修改过这个topic的partition副本数。
./kafka-reassign-partitions.sh --zookeeper localhost:2481 --reassignment-json-file result.json --execute result.json文件内容 {"version":1, "partitions":[ {"topic":"defaultKJLog","partition":2, "replicas":[4,0]}, {"topic":"defaultKJLog","partition":7, "replicas":[3,0]}, {"topic":"defaultKJLog","partition":4, "replicas":[0,1]}, {"topic":"defaultKJLog","partition":1, "replicas":[2,3]}, {"topic":"defaultKJLog","partition":9, "replicas":[2,3]}, {"topic":"defaultKJLog","partition":3, "replicas":[4,0]}, {"topic":"defaultKJLog","partition":6, "replicas":[2,4]}, {"topic":"defaultKJLog","partition":11, "replicas":[2,1]}, {"topic":"defaultKJLog","partition":0, "replicas":[1,2]}, {"topic":"defaultKJLog","partition":8, "replicas":[4,1]}, {"topic":"defaultKJLog","partition":5, "replicas":[1,3]}, {"topic":"defaultKJLog","partition":10, "replicas":[4,0]}]}
执行也都成功了
[root@collect01 bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2481 --reassignment-json-file result.json --verify Status of partition reassignment: Reassignment of partition defaultKJLog-3 completed successfully Reassignment of partition defaultKJLog-5 completed successfully Reassignment of partition defaultKJLog-4 completed successfully Reassignment of partition defaultKJLog-10 completed successfully Reassignment of partition defaultKJLog-0 completed successfully Reassignment of partition defaultKJLog-7 completed successfully Reassignment of partition defaultKJLog-11 completed successfully Reassignment of partition defaultKJLog-1 completed successfully Reassignment of partition defaultKJLog-8 completed successfully Reassignment of partition defaultKJLog-6 completed successfully Reassignment of partition defaultKJLog-2 completed successfully Reassignment of partition defaultKJLog-9 completed successfully
你的问题应该在
broker.id=3
上,这个节点不正常。可是如果我把broker.id=3的节点去掉,仍是不正常。
你在
./kafka-topics.sh --describe --zookeeper localhost:2481
下,只要defaultKJLog的。
[root@collect01 bin]# ./kafka-topics.sh --zookeeper localhost:2481 --topic defaultKJLog --describe Topic:defaultKJLog PartitionCount:12 ReplicationFactor:2 Configs: Topic: defaultKJLog Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2,1 Topic: defaultKJLog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: defaultKJLog Partition: 2 Leader: 4 Replicas: 4,0 Isr: 0,4 Topic: defaultKJLog Partition: 3 Leader: 4 Replicas: 4,0 Isr: 4,0 Topic: defaultKJLog Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: defaultKJLog Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: defaultKJLog Partition: 6 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: defaultKJLog Partition: 7 Leader: 3 Replicas: 3,0 Isr: 0,3 Topic: defaultKJLog Partition: 8 Leader: 4 Replicas: 4,1 Isr: 1,4 Topic: defaultKJLog Partition: 9 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: defaultKJLog Partition: 10 Leader: 4 Replicas: 4,0 Isr: 0,4 Topic: defaultKJLog Partition: 11 Leader: 1 Replicas: 2,1 Isr: 1,2
cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"defaultKJLog","partition":7,"replicas":[1,4]}] } EOF
把这个分区从3上面迁移走,就好了,你的3有问题。
能看出3是什么问题?
你其他的topic都没有在3上面,只有这个有问题的分区在3节点上,如果上面测试已经没问题了,那就可以定位是3节点有问题了。
要定位3节点的问题,要看它的日志输出了。
可是我的partition1,5,9也有在3上面呀,只是brokerid=3的不是leader。如果是broker3节点有问题,我直接把这个节点停掉不行么?我停下来以后依旧有问题的。
我的3节点kafka的server.log上也没有报错。就是不能消费。
[2020-01-17 02:46:31,901] INFO [ProducerStateManager partition=defaultKJLog-7] Writing producer snapshot at offset 16220758524 (kafka.log.ProducerStateManager) [2020-01-17 02:46:31,902] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Rolled new log segment at offset 16220758524 in 2 ms. (kafka.log.Log) [2020-01-17 02:52:42,610] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-01-17 02:53:11,877] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Found deletable segments with base offsets [14059201260] due to retention time 604800000ms breach (kafka.log.Log) [2020-01-17 02:53:11,877] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Scheduling log segment [baseOffset 14059201260, size 1073723120] for deletion. (kafka.log.Log) [2020-01-17 02:53:11,877] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Incrementing log start offset to 14063800336 (kafka.log.Log) [2020-01-17 02:53:11,930] INFO Cleared earliest 0 entries from epoch cache based on passed offset 14063800336 leaving 12 in EpochFile for partition defaultKJLog-7 (kafka.server.epoch.LeaderEpochFileCache) [2020-01-17 02:54:11,877] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Deleting segment 14059201260 (kafka.log.Log) [2020-01-17 02:54:12,005] INFO Deleted log /data/kafka-logs/defaultKJLog-7/00000000014059201260.log.deleted. (kafka.log.LogSegment) [2020-01-17 02:54:12,010] INFO Deleted offset index /data/kafka-logs/defaultKJLog-7/00000000014059201260.index.deleted. (kafka.log.LogSegment) [2020-01-17 02:54:12,011] INFO Deleted time index /data/kafka-logs/defaultKJLog-7/00000000014059201260.timeindex.deleted. (kafka.log.LogSegment) [2020-01-17 03:02:42,610] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-01-17 03:04:42,948] INFO [ProducerStateManager partition=defaultKJLog-7] Writing producer snapshot at offset 16225374460 (kafka.log.ProducerStateManager) [2020-01-17 03:04:42,950] INFO [Log partition=defaultKJLog-7, dir=/data/kafka-logs] Rolled new log segment at offset 16225374460 in 2 ms. (kafka.log.Log)
验证过了,确实将partition7转移到0和4上就OK了。但是我现在怎么找到问题呢?
先重启一下3节点,应该就能恢复,我是有错误信息才能分析。
ps:我怀疑是之前迁移数据,由于同步问题,这个节点已经脱离管控了
你的答案