consumer.seek(topicPartition, offset);
有人用过吗?
*****************java.lang.IllegalStateException: No current assignment for partition HighAvailabilityTest-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134)
at com.masai.kafka.ConsumerOnce.run(ConsumerOnce.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
分区首先肯定是存在的,这个assignment是什么,需要怎么配置还是?求解
问题原因:手动指定消费位置,无法自动负载均衡,所以要手动注册,才能消费
Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", groupId); props.put("enable.auto.commit", false); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", maxPollRecords); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //consumer.subscribe(Arrays.asList(topic)); TopicPartition p = new TopicPartition(topic,2); consumer.assign(Arrays.asList(p)); consumer.seek(p,488430); //while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { String V = record.value(); } // }
没有报错。但还是查不出数据啊
no current assignment for partition,是不是你的分区已经占用了,所以不能分配给你,
你的答案