c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性时遇到问题,场景如下: 用一个进程作为生产者,一直在生产发送topic的消息。 生产者消息没有被消费,当我消费者连接成功时,将会接收所有以往没有被消费过的消息,可我的使用场景是只需要接收当前时间产生的消息,之前的需要忽略掉。 我该设置的是服务器配置还是代码???? 求大佬解答!!只需要接收当前时间产生的消息,之前的需要忽略掉,不管它有没有被消费过。
你想像一下订报纸,在你没有订阅之前,报社根本就不知道你,当你订阅的这一刻,新的报纸才会发给你,老的报纸是不会给你的。这时,你消费者停掉了,发报员依然会为你保留报纸,因为你已经在报社有名单了。这个名单是根据是
group.id
确认的,如果你想每次都是最新的,可以每次都用新的group.id
。另外要设置成
auto.offset.reset=latest
,解释如下:但是,这个
latest
并不代表这一时刻的消息,如果你已经在报社有名单了,那就是基于报社记录你最后一次拿取报纸时间,把这些‘报纸’都发给你。大佬的回答很生动形象,瞬间理解了很多,我试了一下,现场和您说的是一样的。
但现在需要的是,完全不需要老的报纸,每次用新的group.id感觉不太好,是否有接口能在报社名单上消除这个id,我下次再用,它就认为是新的id。将不会给我老的报纸,或者配置中有没有设置,告诉报社,即使名单上有了这个
group.id
,也不用给他旧的 报纸,它来了直接给最新的。是可以删除group.id,可参考:
另外一种方式是重置
offset
,分2步:时间
来获取到offset
offset
c++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:
consumer.offsetsForTimes
// 通过时间,定位offsetconsumer.seek
// 重新定位offset找是找到了,可是不管怎么设置都没生效,难受呀,不知道问题在哪里,还需要设置其它什么,我的想法是在订阅前把这个设置完成,代码如下:
bool MessageQueueConsumer::subscribeAction(){ rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *subscription; subscription = rd_kafka_topic_partition_list_new(_topocList.size()); pthread_mutex_lock(&_mutex); for(int count = 0; count < _topocList.size(); count ++){ rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(), count); } pthread_mutex_unlock(&_mutex); for(int count = 0; count < subscription->cnt; count ++){ subscription->elems[count].offset = RD_KAFKA_OFFSET_END; } rd_kafka_seek_partitions(_handler,subscription, 1000); err = rd_kafka_subscribe(_handler, subscription); if (err) { //错误日志记录 todo rd_kafka_topic_partition_list_destroy(subscription); return false; } rd_kafka_topic_partition_list_destroy(subscription); return true; }
没看到你通过时间获取offset的逻辑。
我看源码中例子是设置了
offset = RD_KAFKA_OFFSET_BEGINNING
就可以了,可是我测试的时候不太行,这是源码中对RD_KAFKA_OFFSET_END
字段的注解:#define RD_KAFKA_OFFSET_BEGINNING \ -2 /**< Start consuming from beginning of \ * kafka partition queue: oldest msg */ #define RD_KAFKA_OFFSET_END \ -1 /**< Start consuming from end of kafka \ * partition queue: next msg */ #define RD_KAFKA_OFFSET_STORED \ -1000 /**< Start consuming from offset retrieved \ * from offset store */ #define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */
我觉得你还是得通过时间获取offset的位置,因为客户端是无法得知当前offset的位置。
你的答案