本地kafka消费者代码获取windows虚拟机上的kafka集群消息时,获取不到却也不报错,只是一直在循环,请问是什么原因?
kafka版本:kafka_2.12-2.5.0
zookeeper版本:zookeeper-3.6.3
windows虚拟机上的kafka集群大体上是按照这篇博客弄出来的: https://blog.csdn.net/weixin_38040473/article/details/106716439, 在虚拟机kafka客户端上是可以正常消费,本地代码也可以正常生产消息。
kafka配置文件server.properties大概只改了这几个属性,其他的没有更改:
broker.id=0
listeners=PLAINTEXT://192.168.2.99:9092
port=9092
host.name=192.168.2.99
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
public class MyKafkaFilterConsumer {
public static void main(String[] args){
// 1.使用Properties定义配置属性
Properties properties = new Properties();
// 设置消费者Broker服务器的连接地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.99:9092,192.168.2.99:9093,192.168.2.99:9094");
// 设置key反序列化的程序,与生成者对应
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置value反序列化的程序,与生产者对应
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 设置消费者组ID,即组名称,值可自定义,组名称相同的消费者进程属于同一个消费者组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"583-con");
// 2, 定义消费者对象
Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 3 设置消费者读取的主题名称,可以设置多个
consumer.subscribe(Arrays.asList("testkkk"));
while(true){
// 拉取消息,并设置超时时间为10秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for(ConsumerRecord<String,String> record : records){
// 打印消息关键消息
// System.out.println("kafka-filter-key: "+record.key()+", kafka-filter-value: "+record.value()+", kafka-filter-partition: "+record.partition()+",kafka-filter-offset: "+record.offset());
System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
}
consumer.commitAsync();
}
}
}
报错信息
"D:\javaPro\idea\IntelliJ IDEA 2019.3\jbr\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:60504,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:\javaPro\ideaPro\新建文件夹\kkT\target\classes;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;C:\Users\微凉\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\微凉\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\微凉\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;C:\Users\微凉\.m2\repository\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;C:\Users\微凉\.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\微凉\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\微凉\.m2\repository\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;C:\Users\微凉\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;C:\Users\微凉\.m2\repository\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;C:\Users\微凉\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;C:\Users\微凉\.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\微凉\.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\微凉\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\微凉\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;C:\Users\微凉\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\javaPro\idea\IntelliJ IDEA 2019.3\lib\idea_rt.jar" cn.xp.consumer.MyKafkaFilterConsumer
Connected to the target VM, address: '127.0.0.1:0', transport: 'socket'
16:05:57.174 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.2.99:9092, 192.168.2.99:9093, 192.168.2.99:9094]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 583-con
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
16:05:57.179 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initializing the Kafka consumer
16:05:57.861 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1620461157856
16:05:57.864 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Kafka consumer initialized
16:05:57.865 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Subscribed to topic(s): testkkk
16:05:57.866 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending FindCoordinator request to broker 192.168.2.99:9093 (id: -2 rack: null)
16:05:57.996 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9093 (id: -2 rack: null) using address /192.168.2.99
16:05:58.004 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node -2. Fetching API versions.
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node -2.
16:05:58.126 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node -2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.128 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='testkkk')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 192.168.2.99:9093 (id: -2 rack: null)
16:05:58.192 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updating last seen epoch from null to 0 for partition testkkk-0
16:05:58.194 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Cluster ID: w_bDBUtGR4-0LpMJ3e4RvA
16:05:58.194 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='w_bDBUtGR4-0LpMJ3e4RvA', nodes=[192.168.2.99:9092 (id: 0 rack: null), 192.168.2.99:9093 (id: 1 rack: null), 192.168.2.99:9094 (id: 2 rack: null)], partitions=[PartitionInfoAndEpoch{partitionInfo=Partition(topic = testkkk, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), epoch=0}], controller=192.168.2.99:9092 (id: 0 rack: null)}
16:05:58.195 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received FindCoordinator response ClientResponse(receivedTimeMs=1620461158194, latencyMs=202, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-583-con-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='192.168.2.99', port=9092))
16:05:58.195 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Discovered group coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.195 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9092 (id: 2147483647 rack: null) using address /192.168.2.99
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinPrepare with generation -1 and memberId
16:05:58.197 [kafka-coordinator-heartbeat-thread | 583-con] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Heartbeat thread started
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.197 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.198 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.199 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.201 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2147483647. Fetching API versions.
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2147483647.
16:05:58.205 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2147483647: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.209 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.210 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolName='range', leader='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', members=[JoinGroupResponseMember(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Performing assignment using strategy range with subscriptions {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@18bc345}
16:05:58.224 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Finished assignment for group at generation 1: {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=Assignment(partitions=[testkkk-0])}
16:05:58.225 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending leader SyncGroup to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null) at generation Generation{generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', protocol='range'}: SyncGroupRequestData(groupId='583-con', generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, assignments=[SyncGroupRequestAssignment(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
16:05:58.238 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Successfully joined group with generation 1
16:05:58.238 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Enabling heartbeat thread
16:05:58.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinComplete with generation 1 and memberId consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2
16:05:58.241 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Adding newly assigned partitions: testkkk-0
16:05:58.247 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetching committed offsets for partitions: [testkkk-0]
16:05:58.251 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Found no committed offset for partition testkkk-0
16:05:58.253 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={testkkk-0={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[0]}}, isolationLevel=READ_UNCOMMITTED) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.256 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9094 (id: 2 rack: null) using address /192.168.2.99
16:05:58.258 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2. Fetching API versions.
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2.
16:05:58.324 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.330 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Handling ListOffsetResponse response for testkkk-0. Fetched offset 7, timestamp -1
16:05:58.331 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Not replacing existing epoch 0 with new epoch 0 for partition testkkk-0
16:05:58.331 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-583-con-1, groupId=583-con] Resetting offset for partition testkkk-0 to offset 7.
16:05:58.334 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.334 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s).
16:05:58.335 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED FullFetchRequest(testkkk-0) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.848 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent a full fetch response that created a new incremental fetch session 1123350265 with 1 response partition(s)
16:05:58.849 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetch READ_UNCOMMITTED at offset 7 for partition testkkk-0 returned fetch data (error=NONE, highWaterMark=7, lastStableOffset = 7, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
16:05:58.851 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=1) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=2) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=3) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
Disconnected from the target VM, address: '127.0.0.1:0', transport: 'socket'
消息可能被其他消费者拿走了(你windowns虚拟机里启动的消费者程序)。
1、停掉windowns里的。
2、换个消费者组名。
还是不行,还是一直在死循环..
1、到kafka里,查看下消费者的情况。
## 新消费者列表查询(支持0.10版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list ## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的) bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
2、生产者是否在持续的发消息给这个topic。
3、查看下kafka的日志,看看是否有异常信息。
sorry,显示消费者详情的版本错了
## 显示某个消费组的消费详情(0.10.1.0版本+) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
参考来自:https://www.orchome.com/454
kafka里的消费者组列表是可以看到该消费者组名的,但查看该消费者组时却显示:
consumer group 'groupp' has no active members
生产者没有持续的发消息给这个topic
我的kafka集群是在windows环境下的,所以我用的是这两个命令:
查看 kafka 消费者组列表:
kafka-consumer-groups.bat --bootstrap-server 192.168.2.99:9092 --list
查看 kafka 中某一个消费者组的消费情况:
kafka-consumer-groups.bat --bootstrap-server 192.168.2.99:9092 --group huangezuming --describe
controller.log信息如下:
[2021-05-08 18:00:32,839] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:00:32,842] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:00:32,843] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:03:32,765] INFO [Controller id=0] New topics: [Set(test588)], deleted topics: [Set()], new partition replica assignment [Map(test588-0 -> ReplicaAssignment(replicas=0, addingReplicas=, removingReplicas=))] (kafka.controller.KafkaController) [2021-05-08 18:03:32,765] INFO [Controller id=0] New partition creation callback for test588-0 (kafka.controller.KafkaController) [2021-05-08 18:03:33,009] INFO [RequestSendThread controllerId=0] Controller 0 connected to 192.168.2.99:9092 (id: 0 rack: null) for sending state change requests (kafka.controller.RequestSendThread) [2021-05-08 18:03:33,017] INFO [RequestSendThread controllerId=0] Controller 0 connected to 192.168.2.99:9093 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread) [2021-05-08 18:03:33,021] INFO [RequestSendThread controllerId=0] Controller 0 connected to 192.168.2.99:9094 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread) [2021-05-08 18:05:32,844] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:05:32,844] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:05:32,845] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:10:32,846] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:10:32,846] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:10:32,847] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:15:33,001] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:15:33,001] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:15:33,002] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:20:33,003] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:20:33,003] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:20:33,301] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:25:33,302] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController) [2021-05-08 18:25:33,302] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 Map() (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController) [2021-05-08 18:25:33,303] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
启动完消费者后,生产者发几条消息先。
我先启动消费者,然后一直死循环,这时候我再生产者发几条消息出来,这时候消费到这几条信息了,但消费完就又继续循环了
topic = test588 offset = 3 value = vf 18:39:18.796 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Committing offsets: {test588-0=OffsetAndMetadata{offset=4, leaderEpoch=0, metadata=''}} 18:39:18.796 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Not replacing existing epoch 0 with new epoch 0 for partition test588-0 18:39:18.821 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Committed offset 4 for partition test588-0 18:39:19.366 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Node 0 sent an incremental fetch response for session 1099671568 with 0 response partition(s), 1 implied partition(s) 18:39:19.366 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Added READ_UNCOMMITTED fetch request for partition test588-0 at position FetchPosition{offset=4, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=192.168.2.99:9092 (id: 0 rack: null), epoch=0}} to node 192.168.2.99:9092 (id: 0 rack: null) 18:39:19.366 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Built incremental fetch (sessionId=1099671568, epoch=80) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 18:39:19.366 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test588-0)) to broker 192.168.2.99:9092 (id: 0 rack: null) 18:39:19.556 [kafka-coordinator-heartbeat-thread | groupppi] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Sending Heartbeat request to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null) 18:39:19.592 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-groupppi-1, groupId=groupppi] Received successful Heartbeat response
这不是正常的吗?
消费者本来就是一直循环拉取消息,然后进行处理呀。
刚刚可以是我先开启消费者消费一个不存在的主题信息(一直循环着),消费者循环的时候接着才开启生产者去生产该主题信息,这时候才可以消费到.
但是刚我又试了 先让生产者生产一个之前没有的主题,生产完毕之后我再开启消费者去消费,这时候消费者却消费不到,这个信息貌似没有被消费过的啊,既然没被消费,为什么这时候又消费不到了? 这应该不是正常的吧?
跟订报纸一样,当你从订阅报纸的那一刻,报社才知道你,后续的报纸才会发送给你(之前的不会给你),所以你需要优先启动消费者。
你可以尝试把消费者停了,然后在发几条消息,重新启动消费者,你就能消费到生产者发的消息了,因为你(消费者组)已经告知了报社,所以报社有了你的记录,就给你补发了。
不行,我刚试了,这样也是没法消费.
我生产者生产一个之前没有的主题的消息时,虽然可以生产成功,但是idea控制台有报如下信息,消费不了会不会和这个有关?
2021-05-08 20:09:18.109 WARN 22532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 31 : {test591=LEADER_NOT_AVAILABLE} 2021-05-08 20:09:18.283 WARN 22532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 32 : {test591=LEADER_NOT_AVAILABLE} 2021-05-08 20:09:18.421 WARN 22532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 33 : {test591=LEADER_NOT_AVAILABLE} 2021-05-08 20:09:18.530 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Got error produce response with correlation id 35 on topic-partition test591-0, retrying (2147483646 attempts left). Error: UNKNOWN_TOPIC_OR_PARTITION 2021-05-08 20:09:18.531 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Received unknown topic or partition error in produce request on partition test591-0. The topic-partition may not exist or the user may not have Describe access to it 2021-05-08 20:09:18.633 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Got error produce response with correlation id 37 on topic-partition test591-0, retrying (2147483645 attempts left). Error: UNKNOWN_TOPIC_OR_PARTITION 2021-05-08 20:09:18.633 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Received unknown topic or partition error in produce request on partition test591-0. The topic-partition may not exist or the user may not have Describe access to it 2021-05-08 20:09:18.740 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Got error produce response with correlation id 39 on topic-partition test591-0, retrying (2147483644 attempts left). Error: UNKNOWN_TOPIC_OR_PARTITION 2021-05-08 20:09:18.740 WARN 22532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Received unknown topic or partition error in produce request on partition test591-0. The topic-partition may not exist or the user may not have Describe access to it 2021-05-08 20:09:18.879 INFO 22532 --- [ad | producer-1] cn.xp.producer.KafkaProduct :
消息发送成功:
SendResult [producerRecord=ProducerRecord(topic=test591, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKT, value=ggg, timestamp=1620475757949), recordMetadata=test591-0@0]
这是警告,然后会创建topic,最后发送成功。
我刚又试了一遍,先运行消费者消费一个不存在的主题信息,然后把消费者停了,接着再生产者生产该主题信息,最后再让消费者去消费该主题,还是消费不到,换一个消费组名也不行..
换消费者组名肯定不行,你这个步骤现实场景几乎是不会出现的哦。
别纠结了。
好的,谢谢大佬~
你的答案