提问说明
1.kafka的消费进程存在,但阻塞不消费数据
2.去kafka服务器上查询某些分区没有消费者
3.使用高级消费api
消费代码
public class KafkaConsumerThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class);
private ThreadPoolExecutor executor;
private ConsumerConnector connector = null;
private KafkaStream<byte[], byte[]> kafkaStream = null;
private int handlerType;
private volatile boolean run = true;
public KafkaConsumerThread(String topic, int partitions, ThreadPoolExecutor executor, int handlerType) {
this.executor = executor;
ConsumerConfig config = createConsumerConfig(topic);
connector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, partitions); // 每个分区设置一个线程消费消息
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = connector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(topic);
kafkaStream = kafkaStreams.get(0);
this.handlerType = handlerType;
}
private ConsumerConfig createConsumerConfig(String topic) {
Properties props = new Properties();
// 连接集群,可以填多个,以","分隔
props.put("zookeeper.connect", Globals.CONSUMER_HOST);
props.put("zookeeper.connection.timeout.ms", Globals.CON_TIMEOUT);
props.put("group.id", "GROUP_" + topic);
props.put("zookeeper.session.timeout.ms", "30000");
props.put("zookeeper.sync.time.ms", "4000");
props.put("socket.receive.buffer.bytes", String.valueOf(10 * 1024 * 1024));
props.put("fetch.message.max.bytes", String.valueOf(10 * 1024 * 1024));
props.put("queued.max.message.chunks", "5");
props.put("auto.commit.interval.ms", "5000");
props.put("rebalance.backoff.ms", "5000");
props.put("rebalance.max.retries", "60");
return new ConsumerConfig(props);
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> msgIte;
MessageAndMetadata<byte[], byte[]> mam;
while (run) {
try {
msgIte = kafkaStream.iterator();
} catch (Exception ex) {
log.error("Consumer kafka biz data Error !!", ex);
continue;
}
try {
while (msgIte.hasNext()) {
mam = msgIte.next();// 接收到的数据
Runnable runnable = null;
if (handlerType == 1) {
runnable = new AHandler(mam.message());
} else if (handlerType == 2) {
runnable = new BHandler(mam.message());
} else if (handlerType == 3){
runnable = new CHandler(mam.message());
} else{
return;
}
executor.execute(runnable);
log.info("Consumer data success! topic:{}, partition:{}, offset:{}", mam.topic(), mam.partition(), mam.offset());
if (log.isDebugEnabled()) {
log.debug("pool size:{}, queue size : {}", executor.getPoolSize(), executor.getQueue().size());
}
}
} catch (Exception e) {
log.error("Parse biz kafka data Error !!", e);
}
}
}
public void stop() {
run = false;
if (connector != null) {
connector.shutdown();
}
}
}
目前采用的解决方案
重启消费者进程就能恢复消费
希望得到帮助
1.问题原因和复现步骤?
2.如何根源解决?
3.该消费代码是否有优化建议?
你这个是0.8版本的kafka吧。很古老。
你使用的线程池是什么类型的?是不是线程池提交任务的时候阻塞了 把代码贴完整一点吧
你的答案