代码1:
for(int i = 0; i < 2; i++){
final int index = i;
PeopleInfo people =new PeopleInfo("Daming", i);
people.getAge();
people.getName();
producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), people), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println(index+" 发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());
}
if (exception != null) {
System.out.println(index+"异常:"+exception.getMessage());
}
}
});
}
代码2:
producer.send(new ProducerRecord<String, Object>("second",Integer.toString(11),people), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println(1+" 发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());
}
if (exception != null) {
System.out.println(1+"异常:"+exception.getMessage());
}
}
});
我想实现的是我用PeopleInfo这个类封装我所要发送的消息,然后通过kafka producer发送出去,我的序列化程序(Encoder)和反序列程序(Decoder)和消息封装类PeopleInfo均已测试过没有问题,可是当我用程序1时,kafka端能够收到消息,Consumer也能根据topic进行消费,producer的send方法中的callback也能进去执行。
可是,当我把代码1换成代码2的时候,也更换了topic名,kafka端就是收不到消息,callback方法也没有执行进去,为什么会出现这种情况呢?消息封装类和序列化类和反序列类都经过测试没有问题,用代码1完全可以发送出去,代码2就是不行,望大神指教!不胜感激!
我尝试了多遍,无意中发现了是producer.close()的关系,代码1中还有这一句没有加上去。
稍等,我在忙。
没看出来差异,我给你看下我们的。
你的答案