嗯,确实是集群问题。
环境中有几个broker,我不停的切换(停止和启动),导致一些主题错乱了。
将级别调整到ERROR,重启后情况有改观了。
拉取的条数跟你消息大小有关,你把poll时间调成100,提交改成自动试试
好的。
但是,poll的时候,测试发现大概每轮只有500条,就停顿0.5-1秒。并不像您说的“大概一次拉取2000-4000条”。
消费者拉取主函数:
public void consume(int threadNumber) {
if (ConsumerThreadTools.mapListeners.isEmpty()) {
logger.error("消息处理对象为空,不进行消费.........返回");
return;
}
if(executors == null) {
executors = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
}else {
}
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println(JSONObject.toJSONString(records));
if (!records.isEmpty()) {
executors.submit(new ConsumerThreadWorker(records, offsets));
}
commitOffsets();
}
} catch (WakeupException e) {
// swallow this exception
} finally {
commitOffsets();
consumer.close();
}
}
问题同:
https://www.orchome.com/805
引用自 solo 答复 原野
“要在zk里面把__consumer_offsets节点和下面的所有节点都删掉,再重新启动集群做消费”
一 .以json文件的形式,来进行添加:
1、查看 主题的信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic myTopic1
确定分区数
2、可以指定分区绑定副本(前提是副本id的broker服务要启动了) 到bin目录下,添加文件 increase-replication-factor.json
[root@localhost bin]# vim increase-replication-factor.json
{"version": 1, "partitions": [
{
"topic": "AA_topic",
"partition": 0,
"replicas": [
4,
5
]
},
{
"topic": "AA_topic",
"partition": 1,
"replicas": [
4,
5
]
}
]
}
其中 replicas 填写的值,就是 kafka 的对应的broker.id 【在kafka的conf目录的 server.properties 文件里能找到】。
3、执行命令
[root@localhost bin]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
4、再看主题的信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic AA_topic
我这边操作记录,试下能不能帮你解决问题