有两个问题:
1. 本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:
/**
class RD_EXPORT KafkaConsumer : public virtual Handle {
...
* @brief Consume message or get error event, triggers callbacks.
* Will automatically call registered callbacks for any such queued events,
* including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
* etc.
* ....
virtual Message *consume (int timeout_ms) = 0;
...
}
*/
注:继承RdKafka::RebalanceCb就可以实现回调
namespace EDU_SP_KAFKA {
class KafkaRebalanceCb : public RdKafka::RebalanceCb {
public:
KafkaRebalanceCb() = default;
~KafkaRebalanceCb() = default;
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) override;
}
int KafkaConsumeImp::initConsumer() {
std::string errstr;
// create KafkaRebalanceCb & KafkaEventCb callback object
kafka_event_cb_ = std::make_shared<EDU_SP_KAFKA::KafkaEventCb>();
kafka_reblance_cb_ = std::shared_ptr<EDU_SP_KAFKA::KafkaRebalanceCb>();
//TODO:好像设置的回调没起作用
kafka_conf_->set("event_cb", kafka_event_cb_.get(), errstr);
kafka_conf_->set("rebalance_cb", kafka_reblance_cb_.get(), errstr)
...};
2. 我是基于librdkafka c++库的基础上再封装了一层sdk提供给上层业务来使用 ,开单独的线程去消费消息,并且模拟了一下业务demo调用,消息消费正常。想问一下进程启动之后各个线程是做什么的,线程启动如下:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
42545 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 kafka-consumer-
42546 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.01 rdk:broker-1
42547 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.78 rdk:main
42548 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 rdk:broker-1
42549 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.30 rdk:broker1
42550 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.28 rdk:broker2
42551 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.61 rdk:broker3
42552 lvfuchao 20 0 473888 4496 4136 S 0.0 0.0 0:00.00 kafka-consumer-
我的理解是:
(1) broke1、broker2、broker3:是分别用来和三个kafka集群进行交互的
(2)42545(kafka-consumer-):是消息事件回调:RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
(3)42552(kafka-consumer-):是开的单个线程去消费消息
(4) 还有两个broker-1不知道做什么用的
线程消费消息代码如下:
bool KafkaConsumeImp::start_consumer() {
// begin consume thread
thread_ = std::make_unique<std::thread>([this] {
this->start();
});}
bool KafkaConsumeImp::start() {
/*Consume messages*/
if(started_) {
return -1;
}
if(!kafka_consumer_) {
return -2;
}
try {
while (EDU_SP_KAFKA::run) {
RdKafka::Message *msg = kafka_consumer_->consume(6000);
consumer_cb(msg, nullptr);
} catch (std::exception &e) {
std::cout << "Execption: consuemer_ start failure !!!" << std::endl;
return -4;
}
started_ = true;
return true;
}
1、本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:
2、线程用途不太清楚,我觉得主要有两种:
其他就不了解了。。
嗯嗯 多谢回答
第一: kafka rebalance触发条件:
第二:kafka线程问题:
您好,大神可以可以解答一下我的疑问,谢谢!!
我不会c额,所以不敢回答你额。我凭着粗浅的经验,给你分析一下吧,我在群里帮你吆喝一下看有c语言的高手在不。
reblance这个回调通知,像是kafka的topic选举leader的时候,或者有新的消费者加入,才会通知你,而不是消息的通知。
./kafka-consumer-sdk kafkasdk_set_msg_callback success :1 % Created consumer name is:rdkafka#consumer-1 main thread is running !!! RebalanceCb: Local: Assign partitions: test[0], test[1], test1[0], test2[0], test2[1]
对的 ,消费者新增就会调用,但是问题1的方式把rebalance_cb设置为智能指针的方式不会触发回调,我后面又试了一下,把这两个回调重新封装成两个单例,然后就可以触发回调,代码如下:
class KafkaRebalanceCb : public RdKafka::RebalanceCb, public Singleton { public: KafkaRebalanceCb() = default; ~KafkaRebalanceCb() = default; virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) override; }; //设置回调 kafkaconf->set("eventcb", KafkaEventCb::Instance(), errstr); kafka_conf->set("rebalance_cb", KafkaRebalanceCb::Instance(), errstr);
以后有c的问题,可以找你了!
您好,大佬 这两个问题有没有答案哦
你的答案