新的Consumer API报错。Error reading field 'brokers': Error reading field 'host': Error reading string of length 28271, only 593 bytes available

克里斯蒂安 发表于: 2016-07-13   最后更新时间: 2016-07-14 09:37:37   13,726 游览

错误如下:

Exception in thread "Thread-0" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 28271, only 593 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at KafkaNew.Consumer$ConsumerThread.run(Consumer.java:40)

代码就是新的consumer api

 Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

用旧的高级api是可以取数据的,新的就报错。
求大神指点。

发表于 2016-07-13

生产kafka版本多少?

半兽人 -> 半兽人 8年前

最新的kafka客户端,要用最新的集群

我记得集群中的好像是kafka-2.1.0-1.2.什么是这个,,好像不是最新的吧

我刚才看了下,是kafka-2.0.1-1.2.0.1.p0.5,这不是最新版吗?

没有2.0.1的版本吧,https://www.orchome.com/66
另外,如果客户端API使用的版本是0.10.0.0的话,那你就要用最新的集群 kafka 2.10-0.10.0.0

集群上kafka的目录就是这样显示的,官网有个kafka_2.10-0.10.0.0.tgz 这个是最新的吗?

恩,是最新的

那我安装这个试试。现在集群用旧的高级api是可以访问的。

新版支持旧版的scala的访问,现在主要改的就是0.9后 java的客户端,你先试下吧

嗯,我先试试换新的行不行。反正旧的api可以读取里面的消息。新的消费者api报的错好像就是说读取broker和host的时候出错了,现在要换最新的写法,调了一天都不行。有问题再请教你,谢谢了

大神,能请教你几个问题吗?查了好久没查到。
1.新的consumer启动后,进行了负载均衡,请问代码中有木有,是如何控制他的负载均衡?
2.C0自己消费topic的3个分区,C1进来后,对于C0已经读过的分区,C1应该是继续跟着offset读取的吧?不是从头的吧?
3.我看到有的文章说0.9会有一个控制器,由他来检测consumer和broker的变化,避免总是负载均衡,现在有这个控制器了吗?
我是新手,请见谅。。。谢谢了!

1.集群会自己进行负载均衡。
2.不会从头。
3.一个是集群自动负载均衡,第二个是消费指定的分区,而不进行负载均衡。

这个问题解决了吗?如果有新的问题,重新提问即可。

我现在先用着旧的api,等集群更新后再试试。谢谢。

你的答案

查看kafka相关的其他问题或提一个您自己的问题