请教大神,使用kafka的C语言库开发的时候,当发送速度非常快的时候,有的消息发送过后没有回调函数,这种情况如何处理

qzuser 发表于: 2017-10-10   最后更新时间: 2017-10-10 10:41:28   5,041 游览

下面是初始化函数,使用的是kafka默认的配置,版本是0.8.2

static INT32 InitKafkaProduceIntf( void )
{
    INT32 i = 0;
    INT8 errstr[512] = {0};
    INT32 iRet = 0;
    rd_kafka_conf_t *conf = NULL;
    rd_kafka_topic_conf_t *topic_conf = NULL;
    rd_kafka_conf_res_t conf_set_value ;

    //初始化话单键索引
    for(i = 0;i<255;i++)
    {
        g_Cofig.cdr_key_index[i] = 255;
    }

    //初始化日志函数
    InitkafkaLog();

    // cdr pid defination
    GET_PID((UINT8 *)"CDRPACK", &gCDRPid);
    if (gCDRPid.pno > PCB_NUM)
    {
        SELF_PID(&gCDRPid);
        gCDRPid.pno = 24;
    }

    SELF_PID(&gSelfPid);

    //动态加载librdkafka.so
    if(0 != load_rdkafka_api())
    {
        WRTSYSLOG_ERROR "failed to load librdkafka.so");
        return C_OS_FAIL;
    }

    //读取配置文件
    if(ReadStaticCfgFromIniFile())
    {
        WRTSYSLOG_ERROR "read config file wrong,init failed");
        return C_OS_FAIL;
    }

    //初始化会话区
    MallocAndInitMemory();

    //创建rd_kafka_conf_t对象。
    conf = zxinos_rd_kafka_conf_new();

    //在rd_kafka_conf_t对象中根据属性名称设置值
    zxinos_rd_kafka_conf_set(conf, "queue.buffering.max.messages", g_Cofig.broker_buff_max_msgs, NULL, 0);//异步时缓存最大消息数
    zxinos_rd_kafka_conf_set(conf, "queue.buffering.max.ms",g_Cofig.broker_buff_max_ms, NULL, 0);//异步时缓存数据最大时间间隔
    zxinos_rd_kafka_conf_set(conf, "message.send.max.retries",g_Cofig.broker_send_max_retries, NULL, 0);//重发次数
    zxinos_rd_kafka_conf_set(conf, "batch.num.messages",g_Cofig.broker_batch_num_mesgs, NULL, 0);//异步时批处理条数
    zxinos_rd_kafka_conf_set(conf, "broker.version.fallback",g_Cofig.broker_version_fallback, NULL, 0);//回退版本号
    zxinos_rd_kafka_conf_set(conf, "producer.type",g_Cofig.producer_type_is_sync,NULL, 0);

    //配置生产消息的回调函数
    zxinos_rd_kafka_conf_set_dr_cb(conf, rcsKafkaCallBack);
    //zxinos_rd_kafka_conf_set_dr_msg_cb


    //创建新的kafka对象句柄,并根据其类型开始操作。
    g_pRkProduce = zxinos_rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if ( NULL == g_pRkProduce )
    {
        WRTSYSLOG_ERROR "InitKafkaProduceIntf: failed to create new producer: %s", errstr);
        return C_OS_FAIL;
    }

    /* Set logger */
    zxinos_rd_kafka_set_logger(g_pRkProduce, logger);
    //zxinos_rd_kafka_set_log_level(g_pRkProduce, UAG_LOG_NOTICE);

    /* Add brokers */
    iRet = zxinos_rd_kafka_brokers_add(g_pRkProduce, g_Cofig.g_szKfkBrokers);
    if ( 0 == iRet )
    {
        WRTSYSLOG_ERROR "InitKafkaProduceIntf: no valid brokers specified:[%s]", g_Cofig.g_szKfkBrokers );
        return C_OS_FAIL;
    }
    WRTSYSLOG_DEBUG"InitKafkaProduceIntf:  brokers specified:[%s]", g_Cofig.g_szKfkBrokers );

    /* Create topic */
    for (i=0; i<g_Cofig.topic_num; i++)
    {
        /* Topic configuration */
        topic_conf = zxinos_rd_kafka_topic_conf_new();
        if (NULL == topic_conf)
        {
            WRTSYSLOG_NOTICE "InitKafkaProduceIntf: failed to rd_kafka_topic_conf_new:[%s]",g_Cofig.topic[g_Cofig.topic_num]);
            return C_OS_FAIL;
        }

        g_pRktProduce[i] = zxinos_rd_kafka_topic_new(g_pRkProduce, g_Cofig.topic[i].tpic_name, topic_conf);
        if ( NULL == g_pRktProduce[i] )
        {
            WRTSYSLOG_ERROR "InitKafkaProduceIntf: failed to create topic:[%s]", g_Cofig.topic[i].tpic_name );
            return C_OS_FAIL;
        }
        WRTSYSLOG_DEBUG "created the topic:[%s]",g_Cofig.topic[i].tpic_name);
    }
    //设置定时器,轮询kafka句柄
    //EXTRA_SET_LOOP_TIMER(1, TIMER2, 0);

    return C_OS_SUCCESS;

}
发表于 2017-10-10
添加评论

猜测:当你的主进程已经结束了,所以还有部分消息没有来得及收取到通知。
解决:增加休眠,等待接受剩余消息。

qzuser -> 半兽人 7年前

进程一直在运行,里面有个定时器一直轮询kafka句柄,几个消息很快发过去后,回调都会过来,但是回调函数带的消息号码都是最新的那个消息号码,请问大神,这个问题需要在哪里设置啊,发送函数rd_kafka_produce(rd_kafka_topic_t rkt, int32_t partitition, int msgflags, void payload, size_t len, const void key, size_t keylen, void msg_opaque);用msg_opaque来传递给回调函数消息号码

半兽人 -> qzuser 7年前

我印象中,kafka 0.8.2没有回调吧。

你的答案

查看kafka相关的其他问题或提一个您自己的问题