kafka库里面已经存在200多条数据,但是用代码去消费,只能消费到前面6条。后面新的数据一直消费不到。有大佬遇到过吗?
兄弟,你得把你的核心代码贴出来呀,都不知道你是怎么做的,咋帮你定位问题呢。。
你可以尝试用官方的例子,直接消费,看看是否正常:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
参考来自:https://www.orchome.com/451
class ConsumerRunner implements Runnable { private KafkaConsumer<String, String> consumer; private String clientId; ConsumerRunner(String clientId) { Properties props = new Properties(); // kafka地址 brokers集群地址用,隔开 props.put("bootstrap.servers", kafkaAddress); // groupid props.put("group.id", kafkaGroupId); // session超时时间 props.put("session.timeout.ms", kafkaConfig.get("session.timeout.ms")); // 是否开启自动提交 props.put("enable.auto.commit", kafkaConfig.get("enable.auto.commit")); // 自动提交的时间间隔 props.put("auto.commit.interval.ms", kafkaConfig.get("auto.commit.interval.ms")); // key的解码方式 props.put("key.deserializer", kafkaConfig.get("key.deserializer")); // value的解码方式 props.put("value.deserializer", kafkaConfig.get("value.deserializer")); //security.protocol props.put("security.protocol", kafkaConfig.get("security.protocol")); // sasl.mechanism props.put("sasl.mechanism", kafkaConfig.get("sasl.mechanism")); String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + kafkaAccount + "\" password=\"" + kafkaPassword + "\";"; // sasl.jaas.config props.put("sasl.jaas.config", loginInfo); // Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none props.put("auto.offset.reset", kafkaConfig.get("auto.offset.reset")); // 批量一次最大拉取数据量 props.put("max.poll.records", kafkaConfig.get("max.poll.records")); // 心跳 props.put("heartbeat.interval.ms", kafkaConfig.get("heartbeat.interval.ms")); // 处理逻辑最大时间 props.put("max.poll.interval.ms", kafkaConfig.get("max.poll.interval.ms")); // 请求响应的最长等待时间 props.put("request.timeout.ms", kafkaConfig.get("request.timeout.ms")); this.consumer = new KafkaConsumer<>(props); this.clientId = clientId; } @Override public void run() { try { Thread.currentThread().setName(clientId); consumer.subscribe(Collections.singleton(kafkaTopic)); // 轮询 while (GlobalVar.ThirdYCPoliceInstance.getInstance().isKafkaIsRunning() && !Thread.currentThread().isInterrupted()) { try { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); logger.info("partitionRecords:{}", JSON.toJSONString(partitionRecords)); for (ConsumerRecord<String, String> record : partitionRecords) { logger.info("kafka监听器收到了消息:{}", String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); if (StringUtils.isNotBlank(record.value())) { kafkaMessageService.wholeHandlerKafkaMsg(record.value()); } } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); logger.info("kafka lastOffSet:{}", lastOffset); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } catch (Exception e) { logger.error("kafka消费失败!", e); } } } catch (Exception e) { logger.error("kafka消费运行失败!", e); } finally { consumer.close(); } } }
这个是消费者日志里面的配置信息
[ConsumerConfig values: auto.commit.interval.ms = 1500 auto.offset.reset = earliest bootstrap.servers = [111.111.111.111:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = TestGroup heartbeat.interval.ms = 10000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 60000 max.poll.records = 30 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 65000 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ]
怀疑这段代码
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); logger.info("kafka lastOffSet:{}", lastOffset); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
其中
partitionRecords.size()
这个明显不是offset的值,换成
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
这段代码我也有怀疑过。后面我注释掉。改成自动提交,也还是一样的效果
查询一下,消费者组的消费情况,贴一下结果:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
大佬。有没有别的办法确认?因为这个kafka是客户那边的。咨询了下他没给我
你找一台安装kafka的机器,执行就可以了,会调到那里的。
客户回了。提示错误。Error:Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException:Call(callName=findCoordinator,deadLineMs=1614796617143) timed out at 1614796617143 after 1 attempt(s)
kafka运行的日志能看出点问题出来吗?我看日志好像一直在Preparing to read 0 bytes of data for partition 420502007003_1609815065726-0 with offset 6
你的客户很敷衍那,我提供的命令肯定要修改成你们的ip和端口,消费者组也要换成你们的。感觉他们直接拿着localhost:9092直接执行的。
localhost:9092
哈哈,是这样的。没办法,我们求着跟他们对接。。。大佬,我问下这种情况会不会是权限哪里有问题?因为刚开始请求客户kafka的时候,会提示Not authorized to access group:testGroup 后面客户说授权了我就可以拿到数据,但只能拿到几条
大佬,还有辙吗?代码可以拿到消费信息或者拿到最新的偏移量是多少吗
1、认证成功之后,就跟认证无关了。2、后面,客户是不是没有新的消息产生那?3、你的线程写的有问题,最好先在main方法里跑,如果有>1个线程在执行你的逻辑,会导致混乱。
就是说我能拿到几条消息了应该就不是权限的问题了。客户是有新的消息产生的,有新推消息。我试过重置偏移量到最新开始读
consumer.poll(0); consumer.seekToBeginning(consumer.partitionsFor(kafkaTopic).stream() .map(partitionInfo -> new TopicPartition(kafkaTopic, partitionInfo.partition())) .collect(Collectors.toList()));
也还是读到6的偏移量那边。我再修改代码看看吧。谢谢了
大佬,咨询下,有台能连上对方kafka的电脑,我安装个kafka。对方kafka是有账号验证的。能通过命令来查看到一些消费信息这些的吗?
能,命令带上认证就可以了。
https://www.orchome.com/1960#item-0-4
周边几篇文章都是,选择正确的加密算法就可以了。
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
兄弟,你得把你的核心代码贴出来呀,都不知道你是怎么做的,咋帮你定位问题呢。。
你可以尝试用官方的例子,直接消费,看看是否正常:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
参考来自:https://www.orchome.com/451
class ConsumerRunner implements Runnable { private KafkaConsumer<String, String> consumer; private String clientId; ConsumerRunner(String clientId) { Properties props = new Properties(); // kafka地址 brokers集群地址用,隔开 props.put("bootstrap.servers", kafkaAddress); // groupid props.put("group.id", kafkaGroupId); // session超时时间 props.put("session.timeout.ms", kafkaConfig.get("session.timeout.ms")); // 是否开启自动提交 props.put("enable.auto.commit", kafkaConfig.get("enable.auto.commit")); // 自动提交的时间间隔 props.put("auto.commit.interval.ms", kafkaConfig.get("auto.commit.interval.ms")); // key的解码方式 props.put("key.deserializer", kafkaConfig.get("key.deserializer")); // value的解码方式 props.put("value.deserializer", kafkaConfig.get("value.deserializer")); //security.protocol props.put("security.protocol", kafkaConfig.get("security.protocol")); // sasl.mechanism props.put("sasl.mechanism", kafkaConfig.get("sasl.mechanism")); String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + kafkaAccount + "\" password=\"" + kafkaPassword + "\";"; // sasl.jaas.config props.put("sasl.jaas.config", loginInfo); // Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none props.put("auto.offset.reset", kafkaConfig.get("auto.offset.reset")); // 批量一次最大拉取数据量 props.put("max.poll.records", kafkaConfig.get("max.poll.records")); // 心跳 props.put("heartbeat.interval.ms", kafkaConfig.get("heartbeat.interval.ms")); // 处理逻辑最大时间 props.put("max.poll.interval.ms", kafkaConfig.get("max.poll.interval.ms")); // 请求响应的最长等待时间 props.put("request.timeout.ms", kafkaConfig.get("request.timeout.ms")); this.consumer = new KafkaConsumer<>(props); this.clientId = clientId; } @Override public void run() { try { Thread.currentThread().setName(clientId); consumer.subscribe(Collections.singleton(kafkaTopic)); // 轮询 while (GlobalVar.ThirdYCPoliceInstance.getInstance().isKafkaIsRunning() && !Thread.currentThread().isInterrupted()) { try { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); logger.info("partitionRecords:{}", JSON.toJSONString(partitionRecords)); for (ConsumerRecord<String, String> record : partitionRecords) { logger.info("kafka监听器收到了消息:{}", String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); if (StringUtils.isNotBlank(record.value())) { kafkaMessageService.wholeHandlerKafkaMsg(record.value()); } } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); logger.info("kafka lastOffSet:{}", lastOffset); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } catch (Exception e) { logger.error("kafka消费失败!", e); } } } catch (Exception e) { logger.error("kafka消费运行失败!", e); } finally { consumer.close(); } } }
这个是消费者日志里面的配置信息
[ConsumerConfig values: auto.commit.interval.ms = 1500 auto.offset.reset = earliest bootstrap.servers = [111.111.111.111:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = TestGroup heartbeat.interval.ms = 10000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 60000 max.poll.records = 30 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 65000 retry.backoff.ms = 100 sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ]
怀疑这段代码
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); logger.info("kafka lastOffSet:{}", lastOffset); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
其中
partitionRecords.size()
这个明显不是offset的值,换成
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
这段代码我也有怀疑过。后面我注释掉。改成自动提交,也还是一样的效果
查询一下,消费者组的消费情况,贴一下结果:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
大佬。有没有别的办法确认?因为这个kafka是客户那边的。咨询了下他没给我
你找一台安装kafka的机器,执行就可以了,会调到那里的。
客户回了。提示错误。Error:Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException:Call(callName=findCoordinator,deadLineMs=1614796617143) timed out at 1614796617143 after 1 attempt(s)
kafka运行的日志能看出点问题出来吗?我看日志好像一直在Preparing to read 0 bytes of data for partition 420502007003_1609815065726-0 with offset 6
你的客户很敷衍那,我提供的命令肯定要修改成你们的ip和端口,消费者组也要换成你们的。
感觉他们直接拿着
localhost:9092
直接执行的。哈哈,是这样的。没办法,我们求着跟他们对接。。。大佬,我问下这种情况会不会是权限哪里有问题?因为刚开始请求客户kafka的时候,会提示Not authorized to access group:testGroup 后面客户说授权了我就可以拿到数据,但只能拿到几条
大佬,还有辙吗?代码可以拿到消费信息或者拿到最新的偏移量是多少吗
1、认证成功之后,就跟认证无关了。
2、后面,客户是不是没有新的消息产生那?
3、你的线程写的有问题,最好先在main方法里跑,如果有>1个线程在执行你的逻辑,会导致混乱。
就是说我能拿到几条消息了应该就不是权限的问题了。客户是有新的消息产生的,有新推消息。我试过重置偏移量到最新开始读
consumer.poll(0); consumer.seekToBeginning(consumer.partitionsFor(kafkaTopic).stream() .map(partitionInfo -> new TopicPartition(kafkaTopic, partitionInfo.partition())) .collect(Collectors.toList()));
也还是读到6的偏移量那边。我再修改代码看看吧。谢谢了
大佬,咨询下,有台能连上对方kafka的电脑,我安装个kafka。对方kafka是有账号验证的。能通过命令来查看到一些消费信息这些的吗?
能,命令带上认证就可以了。
https://www.orchome.com/1960#item-0-4
周边几篇文章都是,选择正确的加密算法就可以了。
你的答案