kafka producer代码发送自定义消息类型

绽放 发表于: 2017-09-18   最后更新时间: 2017-09-18 14:11:13   5,973 游览

代码1:

       for(int i = 0; i < 2; i++){                  
           final int index = i;  
           PeopleInfo people =new PeopleInfo("Daming", i);
           people.getAge();
           people.getName();
           producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), people), new Callback() {  

             @Override  
             public void onCompletion(RecordMetadata metadata, Exception exception) {  
                 if (metadata != null) {  
                     System.out.println(index+"  发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());  
                 }  
                 if (exception != null) {  
                     System.out.println(index+"异常:"+exception.getMessage());  
                 }  
             }  
         });  
       } 

代码2:

producer.send(new ProducerRecord<String, Object>("second",Integer.toString(11),people), new Callback() {  

            @Override  
            public void onCompletion(RecordMetadata metadata, Exception exception) {  
                if (metadata != null) {  
                    System.out.println(1+"  发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());  
                }  
                if (exception != null) {  
                    System.out.println(1+"异常:"+exception.getMessage());  
                }  
            }  
        });

我想实现的是我用PeopleInfo这个类封装我所要发送的消息,然后通过kafka producer发送出去,我的序列化程序(Encoder)和反序列程序(Decoder)和消息封装类PeopleInfo均已测试过没有问题,可是当我用程序1时,kafka端能够收到消息,Consumer也能根据topic进行消费,producer的send方法中的callback也能进去执行。

可是,当我把代码1换成代码2的时候,也更换了topic名,kafka端就是收不到消息,callback方法也没有执行进去,为什么会出现这种情况呢?消息封装类和序列化类和反序列类都经过测试没有问题,用代码1完全可以发送出去,代码2就是不行,望大神指教!不胜感激!

发表于 2017-09-18
添加评论

我尝试了多遍,无意中发现了是producer.close()的关系,代码1中还有这一句没有加上去。

半兽人 -> 绽放 7年前

稍等,我在忙。

半兽人 -> 绽放 7年前

没看出来差异,我给你看下我们的。

/*
从[生产者连接池]获取一条通道,然后发送[message]到broker集群

@param topicName 主题名称
@param message 消息
@param <T> 消息类型
*/
public <T> void sendMessage(String topicName, T message) throws Exception {
// 连接
Producer producer = null;

// 阻塞调用消息发送
try {
producer = producerConnPool.getProducerConn();
producer.send(new ProducerRecord<String, T>(topicName, message)).get();
} catch (InterruptedException e) {
logger.debug("send a message InterruptedException:", e);
throw e;
} catch (ExecutionException e) {
logger.debug("send a message ExecutionException:", e);
throw e;
} catch (Exception e) {
logger.error("fetch a producer connection Exception:", e);
} finally {
// 释放连接
producerConnPool.releaseConn(producer);
}
}

你的答案

查看kafka相关的其他问题或提一个您自己的问题