补充一下最后的解决办法,也许会有人用到
我们的Kafka监控用的是kafka manager,是由雅虎公司开源发布的,现已更名为CMAK,
该监控系统提供了一套API,用以获取kafk集群的一些信息,
其中便有获取所有消费组的接口,定义如下:
GET /api/status/:cluster/consumersSummary 其中:cluster为kafka客户端名称
使用代码调用API的话,需要加上Http的基础验证,使用方法百度一下即可
其他详细API地址为:https://github.com/yahoo/CMAK/blob/master/conf/routes
你的消费者组不活跃了,自然就找不到了:
public List<String> activeConsumerByTopic(String topicName) {
List<String> lists = new ArrayList<>();
try (AdminClient client = KafkaAdminFactory.getInstance()) {
try {
// get all consumer groupId
List<String> groupIds = client.listConsumerGroups().all().get().stream().map(s -> s.groupId()).collect(Collectors.toList());
// Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = client.describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
ConsumerGroupDescription descr = groups.get(groupId);
// find if any description is connected to the topic with topicName
Optional<TopicPartition> tp = descr.members().stream().
map(s -> s.assignment().topicPartitions()).
flatMap(coll -> coll.stream()).
filter(s -> s.topic().equals(topicName)).findAny();
if (tp.isPresent()) {
// you found the consumer, so collect the group id somewhere
lists.add(descr.groupId());
}
}
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
return lists;
}
你可以安装一个我重写的kafka监控,看看是否可以获取到,参考:KafkaOffsetMonitor raft版:监控消费者和延迟的队列
如果能达到你的预期,参考核心功能代码:
https://github.com/orchome/KafkaOffsetMonitor/blob/main/src/main/java/www/orchome/com/kafka/core/KafkaService.java
这个命令有使用,但是正如问题描述的,我这里是查不到消费组下topic的信息的。不知道是否是因为我这里消费时,是用consumer.assign()方法手动指定消费位置,导致kafka无法管理消费组导致的
看了下,是你的消息堵塞的太严重了,来不及发送,越堆越多,导致的oom。
减少延迟,因为你的消息量级已经够了,不要等待0.1秒了,改成1
或者0
。
prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
batch.size,虽然加大了,但是你带宽不够,来不及发送。
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
acks=0
acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
更多生产者配置,参考:Kafka Producer配置
其实就是消息太多,而来不及发送,导致的。
很大可能是kafka服务器节点的硬件(如网络)到达了瓶颈,你可以通过增加分区数/或者kafka节点数来分摊压力,提高发送速度,减少堵塞的消息量。