我觉得你还是得通过时间获取offset的位置,因为客户端是无法得知当前offset的位置。
我看源码中例子是设置了offset = RD_KAFKA_OFFSET_BEGINNING
就可以了,可是我测试的时候不太行,这是源码中对RD_KAFKA_OFFSET_END
字段的注解:
#define RD_KAFKA_OFFSET_BEGINNING \
-2 /**< Start consuming from beginning of \
* kafka partition queue: oldest msg */
#define RD_KAFKA_OFFSET_END \
-1 /**< Start consuming from end of kafka \
* partition queue: next msg */
#define RD_KAFKA_OFFSET_STORED \
-1000 /**< Start consuming from offset retrieved \
* from offset store */
#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */
没看到你通过时间获取offset的逻辑。
找是找到了,可是不管怎么设置都没生效,难受呀,不知道问题在哪里,还需要设置其它什么,我的想法是在订阅前把这个设置完成,代码如下:
bool MessageQueueConsumer::subscribeAction(){
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *subscription;
subscription = rd_kafka_topic_partition_list_new(_topocList.size());
pthread_mutex_lock(&_mutex);
for(int count = 0; count < _topocList.size(); count ++){
rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
count);
}
pthread_mutex_unlock(&_mutex);
for(int count = 0; count < subscription->cnt; count ++){
subscription->elems[count].offset = RD_KAFKA_OFFSET_END;
}
rd_kafka_seek_partitions(_handler,subscription, 1000);
err = rd_kafka_subscribe(_handler, subscription);
if (err) {
//错误日志记录 todo
rd_kafka_topic_partition_list_destroy(subscription);
return false;
}
rd_kafka_topic_partition_list_destroy(subscription);
return true;
}
是可以删除group.id,可参考:
另外一种方式是重置offset
,分2步:
时间
来获取到offset
offset
c++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:
consumer.offsetsForTimes
// 通过时间,定位offsetconsumer.seek
// 重新定位offset