kafka作为任务队列,如何配置可以支持处理长任务?

jksaafe 发表于: 2022-01-27   最后更新时间: 2022-02-08 09:28:09   1,468 游览

问题

用户在前端提交计算任务,后端将计算任务提交到kafka(全局只有一个topic,多个分区),告诉前端任务提交成功。后端有一个线程一直不断从kafka拉取任务处理,但是这个任务可能比较大(达数小时),只有一个消费者的情况下,可以正常执行任务(执行了4个小时),最终也commit了,但是多个消费者情况下(同属于一个消费者组),一个消费者拉取任务执行一段时间后,任务又被其他的消费者拉取执行了,过一会儿又会转移,就这样任务被踢来踢去,尝试了更改消费者配置,但没有解决,也是不熟悉参数之间的约束关系。由于线上是有多个实例的,所以这个问题必须解决。

临时处理方案

因为单消费者消费任务目前看起来是没有问题的,我们用了redis实现一个锁,保证只有一个实例(消费者)消费任务,实际可行。如果kafka的话有没有更好的处理方式呢?比如更改配置让它支持长任务处理

发表于 2022-01-27
添加评论

过一会转移应该报的是这个错误

Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment;

原因参考:https://www.orchome.com/9964

但是,你一条消息处理4个小时,kafka是批量拉取的消息,一条消息,消费者默认阻塞是30秒,超过就报错了,导致你反复拉取该消息。

最后commit也成功了,所以我很疑惑你是怎么成功的,是手动commit,还是自动(会丢失一些消息,因为是按批次提交的,还没来得及处理,但是报错了)。

你可以考虑使用pause和resume的方式,加手动提交的组合,来根据你的业务设计吧,参考:kafka消费者Java客户端

jksaafe -> 半兽人 2年前

感谢大佬回复。我是手动提交的,每次我只拉取一个任务处理,不是批量

jksaafe -> jksaafe 2年前

另外,我如果设置了比较长的超时时间(数小时),会不会其他的意想不到的问题呢?换句话说kafka是不是不适合这类业务场景呢?

半兽人 -> jksaafe 2年前

你拉取到任务,先commit,就不会被其他的消费者拿到了。
顺序:

  1. 拉取消息
  2. 手动commit
  3. 执行pause
  4. 处理消息
  5. 处理完成后,resume。
jksaafe -> 半兽人 2年前

这样的话,假如我任务失败了,岂不是不能重试了?(我没有用过pause和resume,我查了下用法)

jksaafe -> jksaafe 2年前

而且假设我这个实例挂了,那就没法执行resume了

半兽人 -> jksaafe 2年前

失败就报错,做其他处理,不要让kafka侵入到你的业务。

jksaafe -> 半兽人 2年前

我们只是依赖kafka来保证我们的任务一定被执行成功,如果只是可能中途失败,我们会sleep一段时间再重新执行(任务执行幂等),上面的的pause&resume方案是可以解决这个问题的。但是如果实例挂了(内部或外部因素),这个方案就行不通了。

半兽人 -> jksaafe 2年前

实例挂了,那难道就不能恢复吗?

你的答案

查看kafka相关的其他问题或提一个您自己的问题