多实例,手动管理offset,存储到redis,项目启动初始化时,读取offset,使用seek订阅消费,poll 拉取消息,刚调试的时候正常,过几天看下,poll 拉取消息失败
Kafka version : 0.10.1.1 , 多实例环境,手动管理offset
多线程消费,每个线程消费固定的 partition
@Override
public void run() {
consumer.assign(Arrays.asList(topicPartition));
String key = StartInit.REDIS_KEY_MSG + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
String lockKey = REDIS_LOCK_KEY + "topic:" + topicPartition.topic() + "partition:" + topicPartition.partition();
long position = getOffset(key);
log.info("topic [{}], partition [{}] thread start,consume begin offset is [{}] ...", topicPartition.topic(), topicPartition.partition(), position);
consumer.seek(topicPartition, position);
try {
boolean running = true;
while (running) {
long offsetNewest;
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
//保证事务
CustomTransactionCallback customTransactionCallback = new CustomTransactionCallback(0L, records);
transactionTemplate.execute(customTransactionCallback);
offsetNewest = customTransactionCallback.getOffsetNewest();
} catch (Exception e) {
log.error("find exception [{}] ...", e);
continue;
}
consumer.commitSync();
log.info("topic [{}], partition [{}], offsetNewest is [{}] ...", topicPartition.topic(), topicPartition.partition(), offsetNewest);
boolean locked = redisManager.lock(lockKey, 180, 3);
if (locked) {
try {
long offsetDb = getOffset(key);
if (offsetDb < offsetNewest) {
// 更新 redis
stringRedisTemplate.opsForValue().set(key, String.valueOf(offsetNewest));
}
} finally {
redisLockService.unlock(lockKey);
}
}
}
} finally {
consumer.close();
}
}
谢谢解答。2019-12-13 09:34:24.613 [kafka_msg_consumer_2] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0 2019-12-13 09:34:28.034 [kafka_msg_consumer_1] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1 2019-12-13 09:34:28.066 [kafka_msg_consumer_1] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1 2019-12-13 09:34:28.081 [kafka_msg_consumer_0] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2 2019-12-13 09:34:28.081 [kafka_msg_consumer_1] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1 2019-12-13 09:34:28.096 [kafka_msg_consumer_2] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0 2019-12-13 09:34:28.113 [kafka_msg_consumer_1] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1 2019-12-13 09:34:28.113 [kafka_msg_consumer_0] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2 2019-12-13 09:34:28.144 [kafka_msg_consumer_1] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-1 2019-12-13 09:34:28.144 [kafka_msg_consumer_0] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-2 2019-12-13 09:34:28.159 [kafka_msg_consumer_2] WARN o.apache.kafka.clients.consumer.internals.Fetcher - Unknown error fetching data for topic-partition topic-mescenter-mes-0 .....
1、确保kafka客户端版本和服务版本一致(务必)。
2、kafka版本的bug,你现在的版本bug最多,包含此错。具体参考:https://issues.apache.org/jira/browse/KAFKA-6292
3、如果以上都不是,可参考更多:https://issues.apache.org/jira/browse/KAFKA-6292?jql=ORDER%20BY%20lastViewed%20DESC
检查了下,服务器版本和客户端版本不一致,谢谢大佬~
再咨询一下, 根据以上代码 poll 了一批数据处理,因为业务处理出现异常,这里应该怎么处理比较好呢; 上述的代码的逻辑应该是直接抛弃了这批数据,这样肯定不妥?麻烦赐教
kafka客户端不要涉及到业务,否则会非常复杂,让业务自己去处理,让业务决定是否重新丢回到kafka里,还是告警。
https://www.orchome.com/2046
目前是手动管理 offset, 手动记录并控制消费位置,相当于offset 提交已经没有作用了, 提不提交也无所谓了。这种情况感觉和您的回复不符吧?望赐教。
我知道,你需要先把消息commit之后,然后丢给业务层,业务层报不报错,是业务层的问题,你只要保证你的commit没问题。
另外,提交已经没有作用了,但是这条消息业务层已经处理了,说明你这样会重复消费(你可能用redis保证了),那如果想恢复,就销毁掉消费者,重新创建,重新拉消息。
感谢! 是不是可以这样理解,consumer poll 一堆消息之后,每条消息都直接扔给其他线程去处理,扔完之后,直接更新 redis 的 offset(相当于是提交了offset),相当于消费 和 业务逻辑通过线程分开了
是的
我用spring-kafka 2.2.4实现和你类似的功能,设置enable-auto-commit = false,且不做offset提交,但是跑了一段时间出现消费者停止了消费,有看到日志 org.apache.kafka.clients.FetchSessionHandler:394 - [Consumer clientId=consumer-2, groupId=ctk_rk_ad_report_high] Node 2 sent an invalid full fetch response with omitted=,不知道你有没有遇到过类似的情况
你的答案