下面是初始化函数,使用的是kafka默认的配置,版本是0.8.2
static INT32 InitKafkaProduceIntf( void )
{
INT32 i = 0;
INT8 errstr[512] = {0};
INT32 iRet = 0;
rd_kafka_conf_t *conf = NULL;
rd_kafka_topic_conf_t *topic_conf = NULL;
rd_kafka_conf_res_t conf_set_value ;
//初始化话单键索引
for(i = 0;i<255;i++)
{
g_Cofig.cdr_key_index[i] = 255;
}
//初始化日志函数
InitkafkaLog();
// cdr pid defination
GET_PID((UINT8 *)"CDRPACK", &gCDRPid);
if (gCDRPid.pno > PCB_NUM)
{
SELF_PID(&gCDRPid);
gCDRPid.pno = 24;
}
SELF_PID(&gSelfPid);
//动态加载librdkafka.so
if(0 != load_rdkafka_api())
{
WRTSYSLOG_ERROR "failed to load librdkafka.so");
return C_OS_FAIL;
}
//读取配置文件
if(ReadStaticCfgFromIniFile())
{
WRTSYSLOG_ERROR "read config file wrong,init failed");
return C_OS_FAIL;
}
//初始化会话区
MallocAndInitMemory();
//创建rd_kafka_conf_t对象。
conf = zxinos_rd_kafka_conf_new();
//在rd_kafka_conf_t对象中根据属性名称设置值
zxinos_rd_kafka_conf_set(conf, "queue.buffering.max.messages", g_Cofig.broker_buff_max_msgs, NULL, 0);//异步时缓存最大消息数
zxinos_rd_kafka_conf_set(conf, "queue.buffering.max.ms",g_Cofig.broker_buff_max_ms, NULL, 0);//异步时缓存数据最大时间间隔
zxinos_rd_kafka_conf_set(conf, "message.send.max.retries",g_Cofig.broker_send_max_retries, NULL, 0);//重发次数
zxinos_rd_kafka_conf_set(conf, "batch.num.messages",g_Cofig.broker_batch_num_mesgs, NULL, 0);//异步时批处理条数
zxinos_rd_kafka_conf_set(conf, "broker.version.fallback",g_Cofig.broker_version_fallback, NULL, 0);//回退版本号
zxinos_rd_kafka_conf_set(conf, "producer.type",g_Cofig.producer_type_is_sync,NULL, 0);
//配置生产消息的回调函数
zxinos_rd_kafka_conf_set_dr_cb(conf, rcsKafkaCallBack);
//zxinos_rd_kafka_conf_set_dr_msg_cb
//创建新的kafka对象句柄,并根据其类型开始操作。
g_pRkProduce = zxinos_rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if ( NULL == g_pRkProduce )
{
WRTSYSLOG_ERROR "InitKafkaProduceIntf: failed to create new producer: %s", errstr);
return C_OS_FAIL;
}
/* Set logger */
zxinos_rd_kafka_set_logger(g_pRkProduce, logger);
//zxinos_rd_kafka_set_log_level(g_pRkProduce, UAG_LOG_NOTICE);
/* Add brokers */
iRet = zxinos_rd_kafka_brokers_add(g_pRkProduce, g_Cofig.g_szKfkBrokers);
if ( 0 == iRet )
{
WRTSYSLOG_ERROR "InitKafkaProduceIntf: no valid brokers specified:[%s]", g_Cofig.g_szKfkBrokers );
return C_OS_FAIL;
}
WRTSYSLOG_DEBUG"InitKafkaProduceIntf: brokers specified:[%s]", g_Cofig.g_szKfkBrokers );
/* Create topic */
for (i=0; i<g_Cofig.topic_num; i++)
{
/* Topic configuration */
topic_conf = zxinos_rd_kafka_topic_conf_new();
if (NULL == topic_conf)
{
WRTSYSLOG_NOTICE "InitKafkaProduceIntf: failed to rd_kafka_topic_conf_new:[%s]",g_Cofig.topic[g_Cofig.topic_num]);
return C_OS_FAIL;
}
g_pRktProduce[i] = zxinos_rd_kafka_topic_new(g_pRkProduce, g_Cofig.topic[i].tpic_name, topic_conf);
if ( NULL == g_pRktProduce[i] )
{
WRTSYSLOG_ERROR "InitKafkaProduceIntf: failed to create topic:[%s]", g_Cofig.topic[i].tpic_name );
return C_OS_FAIL;
}
WRTSYSLOG_DEBUG "created the topic:[%s]",g_Cofig.topic[i].tpic_name);
}
//设置定时器,轮询kafka句柄
//EXTRA_SET_LOOP_TIMER(1, TIMER2, 0);
return C_OS_SUCCESS;
}
猜测:当你的主进程已经结束了,所以还有部分消息没有来得及收取到通知。
解决:增加休眠,等待接受剩余消息。
进程一直在运行,里面有个定时器一直轮询kafka句柄,几个消息很快发过去后,回调都会过来,但是回调函数带的消息号码都是最新的那个消息号码,请问大神,这个问题需要在哪里设置啊,发送函数rd_kafka_produce(rd_kafka_topic_t rkt, int32_t partitition, int msgflags, void payload, size_t len, const void key, size_t keylen, void msg_opaque);用msg_opaque来传递给回调函数消息号码
我印象中,kafka 0.8.2没有回调吧。
你的答案