如果不正常,应该怎么优化?
这是我的代码
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.1.1:2188");
properties.put("group.id", "1");
//properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("serializer.class", "com.util.ObjectDecoder");
// properties.put("auto.commit.enable", "false");
properties.put("zookeeper.session.timeout.ms", "15000");
// 指定多久消费者更新offset到zookeeper中
// properties.put("zookeeper.sync.time.ms", "2000");
// 必须要加,如果要读旧数据
properties.put("auto.offset.reset", "smallest");
// properties.put("auto.offset.reset", "largest");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(
properties));
}
加入一个consumer时,kafka集群是要重新balance消费者。这个时间是有的。
你的答案