兄弟 你得提供一些代码呀
Properties props = new Properties(); props.put("max.partition.fetch.bytes", "1048576"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); props.put("request.timeout.ms", "100000"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "DataTMR_hbase"); props.put("enable.auto.commit", "false"); props.put("bootstrap.servers", "xxxxxx"); Consumer<string,string> consumer = new KafkaConsumer<string,string>(props); consumer.subscribe(topicnamelist); while(true){ ConsumerRecords<string,string> consumerRecords = consumer.poll(1000); } }
代码主要就是这段
还有个问题想请教下 ,一个topic有三个分区,我创建了一个consumer去poll数据,然后放进queue队列里面,多线程消费这个队列。我想改成三个consumer去消费,如果用subscribe订阅,会不会造成reblance
poll的时间也太长了,关注下手动提交offset的逻辑,是这个导致的,你可以先改成自动提交测试一下。参考:https://www.orchome.com/451
刚启动subscribe的时,消费者组内进行rebalance,不会影响你的性能的。
好的 我先试下 有疑问在请教你
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
兄弟 你得提供一些代码呀
Properties props = new Properties(); props.put("max.partition.fetch.bytes", "1048576"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); props.put("request.timeout.ms", "100000"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "DataTMR_hbase"); props.put("enable.auto.commit", "false"); props.put("bootstrap.servers", "xxxxxx"); Consumer<string,string> consumer = new KafkaConsumer<string,string>(props); consumer.subscribe(topicnamelist); while(true){ ConsumerRecords<string,string> consumerRecords = consumer.poll(1000); } }
代码主要就是这段
还有个问题想请教下 ,一个topic有三个分区,我创建了一个consumer去poll数据,然后放进queue队列里面,多线程消费这个队列。我想改成三个consumer去消费,如果用subscribe订阅,会不会造成reblance
poll的时间也太长了,关注下手动提交offset的逻辑,是这个导致的,你可以先改成自动提交测试一下。
参考:https://www.orchome.com/451
刚启动subscribe的时,消费者组内进行rebalance,不会影响你的性能的。
好的 我先试下 有疑问在请教你
你的答案