“kafka消费者Java客户端”章节中,直接复制“示例”中的“自动提交偏移量(Automatic Offset Committing)”下的代码执行,却得不到要的结果,全部是日志的打印,然后完全不知道为什么,也不知道哪里出了问题导致得不到想要的输出结果。
代码来自:https://www.orchome.com/451#item-5-1
下面是截取的部分日志打印
17:16:25.105 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
17:16:25.112 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-1, groupId=test] Initializing the Kafka consumer
17:16:26.008 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.0
17:16:26.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: ebb1d6e21cc92130
17:16:26.009 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621329386004
17:16:26.012 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-1, groupId=test] Kafka consumer initialized
17:16:26.013 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-1, groupId=test] Subscribed to topic(s): my-topic
17:16:26.015 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending FindCoordinator request to broker localhost:9092 (id: -1 rack: null)
17:16:26.352 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1
17:16:26.373 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-1, groupId=test] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
17:16:26.375 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Completed connection to node -1. Fetching API versions.
17:16:26.375 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node -1.
17:16:26.398 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=1) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.442 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=1): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.510 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node -1.
17:16:26.510 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.512 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=3), ApiVersion(apiKey=1, minVersion=0, maxVersion=5), ApiVersion(apiKey=2, minVersion=0, maxVersion=2), ApiVersion(apiKey=3, minVersion=0, maxVersion=4), ApiVersion(apiKey=4, minVersion=0, maxVersion=0), ApiVersion(apiKey=5, minVersion=0, maxVersion=0), ApiVersion(apiKey=6, minVersion=0, maxVersion=3), ApiVersion(apiKey=7, minVersion=1, maxVersion=1), ApiVersion(apiKey=8, minVersion=0, maxVersion=3), ApiVersion(apiKey=9, minVersion=0, maxVersion=3), ApiVersion(apiKey=10, minVersion=0, maxVersion=1), ApiVersion(apiKey=11, minVersion=0, maxVersion=2), ApiVersion(apiKey=12, minVersion=0, maxVersion=1), ApiVersion(apiKey=13, minVersion=0, maxVersion=1), ApiVersion(apiKey=14, minVersion=0, maxVersion=1), ApiVersion(apiKey=15, minVersion=0, maxVersion=1), ApiVersion(apiKey=16, minVersion=0, maxVersion=1), ApiVersion(apiKey=17, minVersion=0, maxVersion=0), ApiVersion(apiKey=18, minVersion=0, maxVersion=1), ApiVersion(apiKey=19, minVersion=0, maxVersion=2), ApiVersion(apiKey=20, minVersion=0, maxVersion=1), ApiVersion(apiKey=21, minVersion=0, maxVersion=0), ApiVersion(apiKey=22, minVersion=0, maxVersion=0), ApiVersion(apiKey=23, minVersion=0, maxVersion=0), ApiVersion(apiKey=24, minVersion=0, maxVersion=0), ApiVersion(apiKey=25, minVersion=0, maxVersion=0), ApiVersion(apiKey=26, minVersion=0, maxVersion=0), ApiVersion(apiKey=27, minVersion=0, maxVersion=0), ApiVersion(apiKey=28, minVersion=0, maxVersion=0), ApiVersion(apiKey=29, minVersion=0, maxVersion=0), ApiVersion(apiKey=30, minVersion=0, maxVersion=0), ApiVersion(apiKey=31, minVersion=0, maxVersion=0), ApiVersion(apiKey=32, minVersion=0, maxVersion=0), ApiVersion(apiKey=33, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.515 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Node -1 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): UNSUPPORTED, CreatePartitions(37): UNSUPPORTED, CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED, ElectLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED, AlterPartitionReassignments(45): UNSUPPORTED, ListPartitionReassignments(46): UNSUPPORTED, OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): UNSUPPORTED, DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED).
17:16:26.518 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='my-topic')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null)
17:16:26.518 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=4, clientId=consumer-test-1, correlationId=3) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='my-topic')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
17:16:26.519 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-test-1, correlationId=0) and timeout 30000 to node -1: FindCoordinatorRequestData(key='test', keyType=0)
17:16:26.521 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=4, clientId=consumer-test-1, correlationId=3): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='windows10.microdone.cn', port=9092, rack=null)], clusterId='uQsKPlQjQBqoiESUqk324Q', controllerId=0, topics=[MetadataResponseTopic(errorCode=0, name='my-topic', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=0, leaderEpoch=-1, replicaNodes=[0], isrNodes=[0], offlineReplicas=[])], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648)
17:16:26.532 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-1, groupId=test] Cluster ID: uQsKPlQjQBqoiESUqk324Q
17:16:26.533 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-1, groupId=test] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='uQsKPlQjQBqoiESUqk324Q', nodes={0=windows10.microdone.cn:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=my-topic-0, leader=Optional[0], leaderEpoch=Optional.empty, replicas=0, isr=0, offlineReplicas=)], controller=windows10.microdone.cn:9092 (id: 0 rack: null)}
17:16:26.533 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FIND_COORDINATOR response from node -1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-test-1, correlationId=0): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage=null, nodeId=0, host='windows10.microdone.cn', port=9092)
17:16:26.534 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Received FindCoordinator response ClientResponse(receivedTimeMs=1621329386533, latencyMs=189, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-test-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage=null, nodeId=0, host='windows10.microdone.cn', port=9092))
17:16:26.534 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator windows10.microdone.cn:9092 (id: 2147483647 rack: null)
17:16:26.537 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating connection to node windows10.microdone.cn:9092 (id: 2147483647 rack: null) using address windows10.microdone.cn/192.168.56.1
17:16:26.540 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Executing onJoinPrepare with generation -1 and memberId
17:16:26.541 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending synchronous auto-commit of offsets {}
17:16:26.541 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
17:16:26.542 [kafka-coordinator-heartbeat-thread | test] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Heartbeat thread started
17:16:26.542 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Joining group with current subscription: [my-topic]
17:16:26.548 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending JoinGroup (JoinGroupRequestData(groupId='test', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator windows10.microdone.cn:9092 (id: 2147483647 rack: null)
17:16:26.552 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-1, groupId=test] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
17:16:26.552 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Completed connection to node 2147483647. Fetching API versions.
17:16:26.552 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node 2147483647.
17:16:26.552 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=5) and timeout 30000 to node 2147483647: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.558 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node 2147483647 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=5): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.558 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node 2147483647.
17:16:26.558 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=6) and timeout 30000 to node 2147483647: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.563 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node 2147483647 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=6): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=3), ApiVersion(apiKey=1, minVersion=0, maxVersion=5), ApiVersion(apiKey=2, minVersion=0, maxVersion=2), ApiVersion(apiKey=3, minVersion=0, maxVersion=4), ApiVersion(apiKey=4, minVersion=0, maxVersion=0), ApiVersion(apiKey=5, minVersion=0, maxVersion=0), ApiVersion(apiKey=6, minVersion=0, maxVersion=3), ApiVersion(apiKey=7, minVersion=1, maxVersion=1), ApiVersion(apiKey=8, minVersion=0, maxVersion=3), ApiVersion(apiKey=9, minVersion=0, maxVersion=3), ApiVersion(apiKey=10, minVersion=0, maxVersion=1), ApiVersion(apiKey=11, minVersion=0, maxVersion=2), ApiVersion(apiKey=12, minVersion=0, maxVersion=1), ApiVersion(apiKey=13, minVersion=0, maxVersion=1), ApiVersion(apiKey=14, minVersion=0, maxVersion=1), ApiVersion(apiKey=15, minVersion=0, maxVersion=1), ApiVersion(apiKey=16, minVersion=0, maxVersion=1), ApiVersion(apiKey=17, minVersion=0, maxVersion=0), ApiVersion(apiKey=18, minVersion=0, maxVersion=1), ApiVersion(apiKey=19, minVersion=0, maxVersion=2), ApiVersion(apiKey=20, minVersion=0, maxVersion=1), ApiVersion(apiKey=21, minVersion=0, maxVersion=0), ApiVersion(apiKey=22, minVersion=0, maxVersion=0), ApiVersion(apiKey=23, minVersion=0, maxVersion=0), ApiVersion(apiKey=24, minVersion=0, maxVersion=0), ApiVersion(apiKey=25, minVersion=0, maxVersion=0), ApiVersion(apiKey=26, minVersion=0, maxVersion=0), ApiVersion(apiKey=27, minVersion=0, maxVersion=0), ApiVersion(apiKey=28, minVersion=0, maxVersion=0), ApiVersion(apiKey=29, minVersion=0, maxVersion=0), ApiVersion(apiKey=30, minVersion=0, maxVersion=0), ApiVersion(apiKey=31, minVersion=0, maxVersion=0), ApiVersion(apiKey=32, minVersion=0, maxVersion=0), ApiVersion(apiKey=33, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.563 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Node 2147483647 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): UNSUPPORTED, CreatePartitions(37): UNSUPPORTED, CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED, ElectLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED, AlterPartitionReassignments(45): UNSUPPORTED, ListPartitionReassignments(46): UNSUPPORTED, OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): UNSUPPORTED, DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED).
17:16:26.564 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=2, clientId=consumer-test-1, correlationId=4) and timeout 305000 to node 2147483647: JoinGroupRequestData(groupId='test', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0])])
17:16:26.574 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received JOIN_GROUP response from node 2147483647 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=2, clientId=consumer-test-1, correlationId=4): JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=16, protocolType=null, protocolName='range', leader='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', members=[JoinGroupResponseMember(memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0])])
17:16:26.574 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=16, protocolType=null, protocolName='range', leader='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', members=[JoinGroupResponseMember(memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0])])
17:16:26.575 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Enabling heartbeat thread
17:16:26.575 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', protocol='range'}
17:16:26.575 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Performing assignment using strategy range with subscriptions {consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@568ff82}
17:16:26.577 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 16: {consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f=Assignment(partitions=[my-topic-0])}
17:16:26.579 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending leader SyncGroup to coordinator windows10.microdone.cn:9092 (id: 2147483647 rack: null) at generation Generation{generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', protocol='range'}: SyncGroupRequestData(groupId='test', generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', assignment=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
17:16:26.580 [kafka-coordinator-heartbeat-thread | test] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending SYNC_GROUP request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=1, clientId=consumer-test-1, correlationId=7) and timeout 30000 to node 2147483647: SyncGroupRequestData(groupId='test', generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', assignment=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
17:16:26.583 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received SYNC_GROUP response from node 2147483647 for request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=1, clientId=consumer-test-1, correlationId=7): SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, protocolName=null, assignment=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])
17:16:26.583 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Received successful SyncGroup response: SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, protocolName=null, assignment=[0, 1, 0, 0, 0, 1, 0, 8, 109, 121, 45, 116, 111, 112, 105, 99, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])
17:16:26.583 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', protocol='range'}
17:16:26.584 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Executing onJoinComplete with generation 16 and memberId consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f
17:16:26.585 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[my-topic-0])
17:16:26.588 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: my-topic-0
17:16:26.592 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Fetching committed offsets for partitions: [my-topic-0]
17:16:26.593 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending OFFSET_FETCH request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-test-1, correlationId=8) and timeout 30000 to node 2147483647: OffsetFetchRequestData(groupId='test', topics=[OffsetFetchRequestTopic(name='my-topic', partitionIndexes=[0])], requireStable=false)
17:16:26.595 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received OFFSET_FETCH response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-test-1, correlationId=8): OffsetFetchResponseData(throttleTimeMs=0, topics=[OffsetFetchResponseTopic(name='my-topic', partitions=[OffsetFetchResponsePartition(partitionIndex=0, committedOffset=100, committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0)
17:16:26.598 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[windows10.microdone.cn:9092 (id: 0 rack: null)], epoch=absent}}
17:16:26.603 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[windows10.microdone.cn:9092 (id: 0 rack: null)], epoch=absent}} to node windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:26.603 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 0 with 1 partition(s).
17:16:26.604 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Sending READ_UNCOMMITTED FullFetchRequest(my-topic-0) to broker windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:26.605 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating connection to node windows10.microdone.cn:9092 (id: 0 rack: null) using address windows10.microdone.cn/192.168.56.1
17:16:26.607 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-1, groupId=test] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0
17:16:26.608 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Completed connection to node 0. Fetching API versions.
17:16:26.608 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node 0.
17:16:26.608 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=10) and timeout 30000 to node 0: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.609 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node 0 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-test-1, correlationId=10): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.610 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Initiating API versions fetch from node 0.
17:16:26.610 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=11) and timeout 30000 to node 0: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='2.8.0')
17:16:26.611 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received API_VERSIONS response from node 0 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=0, clientId=consumer-test-1, correlationId=11): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=3), ApiVersion(apiKey=1, minVersion=0, maxVersion=5), ApiVersion(apiKey=2, minVersion=0, maxVersion=2), ApiVersion(apiKey=3, minVersion=0, maxVersion=4), ApiVersion(apiKey=4, minVersion=0, maxVersion=0), ApiVersion(apiKey=5, minVersion=0, maxVersion=0), ApiVersion(apiKey=6, minVersion=0, maxVersion=3), ApiVersion(apiKey=7, minVersion=1, maxVersion=1), ApiVersion(apiKey=8, minVersion=0, maxVersion=3), ApiVersion(apiKey=9, minVersion=0, maxVersion=3), ApiVersion(apiKey=10, minVersion=0, maxVersion=1), ApiVersion(apiKey=11, minVersion=0, maxVersion=2), ApiVersion(apiKey=12, minVersion=0, maxVersion=1), ApiVersion(apiKey=13, minVersion=0, maxVersion=1), ApiVersion(apiKey=14, minVersion=0, maxVersion=1), ApiVersion(apiKey=15, minVersion=0, maxVersion=1), ApiVersion(apiKey=16, minVersion=0, maxVersion=1), ApiVersion(apiKey=17, minVersion=0, maxVersion=0), ApiVersion(apiKey=18, minVersion=0, maxVersion=1), ApiVersion(apiKey=19, minVersion=0, maxVersion=2), ApiVersion(apiKey=20, minVersion=0, maxVersion=1), ApiVersion(apiKey=21, minVersion=0, maxVersion=0), ApiVersion(apiKey=22, minVersion=0, maxVersion=0), ApiVersion(apiKey=23, minVersion=0, maxVersion=0), ApiVersion(apiKey=24, minVersion=0, maxVersion=0), ApiVersion(apiKey=25, minVersion=0, maxVersion=0), ApiVersion(apiKey=26, minVersion=0, maxVersion=0), ApiVersion(apiKey=27, minVersion=0, maxVersion=0), ApiVersion(apiKey=28, minVersion=0, maxVersion=0), ApiVersion(apiKey=29, minVersion=0, maxVersion=0), ApiVersion(apiKey=30, minVersion=0, maxVersion=0), ApiVersion(apiKey=31, minVersion=0, maxVersion=0), ApiVersion(apiKey=32, minVersion=0, maxVersion=0), ApiVersion(apiKey=33, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[])
17:16:26.612 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Node 0 has finalized features epoch: -1, finalized features: [], supported features: [], API versions: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): UNSUPPORTED, CreatePartitions(37): UNSUPPORTED, CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED, ElectLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED, AlterPartitionReassignments(45): UNSUPPORTED, ListPartitionReassignments(46): UNSUPPORTED, OffsetDelete(47): UNSUPPORTED, DescribeClientQuotas(48): UNSUPPORTED, AlterClientQuotas(49): UNSUPPORTED, DescribeUserScramCredentials(50): UNSUPPORTED, AlterUserScramCredentials(51): UNSUPPORTED, AlterIsr(56): UNSUPPORTED, UpdateFeatures(57): UNSUPPORTED, DescribeCluster(60): UNSUPPORTED, DescribeProducers(61): UNSUPPORTED).
17:16:26.616 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=9) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='my-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=-1, fetchOffset=100, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
17:16:27.128 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=9): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='my-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=100, lastStableOffset=-1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])])
17:16:27.128 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Node 0 sent a full fetch response with 1 response partition(s)
17:16:27.130 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Fetch READ_UNCOMMITTED at offset 100 for partition my-topic-0 returned fetch data (error=NONE, highWaterMark=100, lastStableOffset = -1, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
17:16:27.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[windows10.microdone.cn:9092 (id: 0 rack: null)], epoch=absent}} to node windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:27.133 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 0 with 1 partition(s).
17:16:27.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Sending READ_UNCOMMITTED FullFetchRequest(my-topic-0) to broker windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:27.134 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=12) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='my-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=-1, fetchOffset=100, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
17:16:27.586 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending asynchronous auto-commit of offsets {my-topic-0=OffsetAndMetadata{offset=100, leaderEpoch=null, metadata=''}}
17:16:27.588 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-test-1, correlationId=13) and timeout 30000 to node 2147483647: OffsetCommitRequestData(groupId='test', generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='my-topic', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=100, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
17:16:27.590 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received OFFSET_COMMIT response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-test-1, correlationId=13): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='my-topic', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
17:16:27.590 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Committed offset 100 for partition my-topic-0
17:16:27.590 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Completed asynchronous auto-commit of offsets {my-topic-0=OffsetAndMetadata{offset=100, leaderEpoch=null, metadata=''}}
17:16:27.636 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=12): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='my-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=100, lastStableOffset=-1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])])
17:16:27.636 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Node 0 sent a full fetch response with 1 response partition(s)
17:16:27.636 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Fetch READ_UNCOMMITTED at offset 100 for partition my-topic-0 returned fetch data (error=NONE, highWaterMark=100, lastStableOffset = -1, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
17:16:27.636 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[windows10.microdone.cn:9092 (id: 0 rack: null)], epoch=absent}} to node windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:27.636 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 0 with 1 partition(s).
17:16:27.636 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Sending READ_UNCOMMITTED FullFetchRequest(my-topic-0) to broker windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:27.637 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=14) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='my-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=-1, fetchOffset=100, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
17:16:28.140 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=14): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='my-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=100, lastStableOffset=-1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])])
17:16:28.140 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Node 0 sent a full fetch response with 1 response partition(s)
17:16:28.140 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Fetch READ_UNCOMMITTED at offset 100 for partition my-topic-0 returned fetch data (error=NONE, highWaterMark=100, lastStableOffset = -1, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
17:16:28.140 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition my-topic-0 at position FetchPosition{offset=100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[windows10.microdone.cn:9092 (id: 0 rack: null)], epoch=absent}} to node windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:28.141 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 0 with 1 partition(s).
17:16:28.141 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Sending READ_UNCOMMITTED FullFetchRequest(my-topic-0) to broker windows10.microdone.cn:9092 (id: 0 rack: null)
17:16:28.141 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=15) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='my-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=-1, fetchOffset=100, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
17:16:28.587 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending asynchronous auto-commit of offsets {my-topic-0=OffsetAndMetadata{offset=100, leaderEpoch=null, metadata=''}}
17:16:28.588 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-test-1, correlationId=16) and timeout 30000 to node 2147483647: OffsetCommitRequestData(groupId='test', generationId=16, memberId='consumer-test-1-0644a6b7-a981-406e-aa50-9de01fc6c77f', groupInstanceId=null, retentionTimeMs=-1, topics=[OffsetCommitRequestTopic(name='my-topic', partitions=[OffsetCommitRequestPartition(partitionIndex=0, committedOffset=100, committedLeaderEpoch=-1, commitTimestamp=-1, committedMetadata='')])])
17:16:28.590 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received OFFSET_COMMIT response from node 2147483647 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-test-1, correlationId=16): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='my-topic', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
17:16:28.590 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Committed offset 100 for partition my-topic-0
17:16:28.590 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Completed asynchronous auto-commit of offsets {my-topic-0=OffsetAndMetadata{offset=100, leaderEpoch=null, metadata=''}}
17:16:28.643 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=5, clientId=consumer-test-1, correlationId=15): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='my-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=100, lastStableOffset=-1, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])])
这不都正常嘛?
你往topic里发点消息,程序就能接到了呀。
原来是需要我再启动一个线程来重新发消息才行;因为我前面直接用命令窗口开启消费者拉取topic消息的时候,它会把我消费者开启之前发出的消息也拉取到;然后你这段代码不能直接自己去抓取消费者开启前发送的消息,只能是消费者开启后,再开启生产者发新消息,消费者才能收到。这个原因是什么呢?
跟订报纸一样,当你从订阅报纸的那一刻,报社才知道你,后续的报纸才会发送给你(之前的不会给你),所以你需要优先启动消费者。
你可以尝试把消费者停了,然后在发几条消息,重新启动消费者,你就能消费到生产者发的消息了,因为你(消费者组)已经告知了报社,所以报社有了你的记录,就给你补发了。
原来是这样,明白了,谢谢您~
你的答案