使用kafka consumer不消费消息,poll方法不返回数据
这是创建消费者的代码:
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.0.93:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("zombie1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
我使用命令,可以看到消息已经推送到了kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.0.93:9092 --topic zombie1 --from-beginning
大神,有没有遇到这样的现象?求救啊
刚才刚到了这篇https://www.orchome.com/1619 ,但是我这边是打开的,配置如下:
listeners=PLAINTEXT://172.16.0.93:9092
消费前输出如下:
[2019-07-25 14:47:34,604][org.apache.kafka.clients.consumer.ConsumerConfig:165] [INFO ] ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [172.16.0.93:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = true ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = latest [2019-07-25 14:47:34,719][org.apache.kafka.common.utils.AppInfoParser:82] [INFO ] Kafka version : 0.9.0.1 [2019-07-25 14:47:34,719][org.apache.kafka.common.utils.AppInfoParser:83] [INFO ] Kafka commitId : 23c69d62a0cabf06
这样就可以了,你在往里面发消息,就能消费到了。
那就是说我之前发的消息都消费不到了,是吗?
嗯,你要调整一个参数,可以消费到之前的,但是没必要。
就跟订报纸一样,一开始报社是不知道你,从当你从订的那刻,报社就知道你了
哦哦。好的。谢谢大神
结贴吧。
你的答案