为什么kafka消费消息不完整?
我在代码里produce,在Shell里consumer,为什么consumer不完整?只到767,不到999?
代码如下:
package com.lzm.demo.demo_kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * * @author * @date 2016 1 11 4:18:56 */ public class kafkaConsumer extends Thread { private String[] topics; public kafkaConsumer(String[] topics) { super(); this.topics = topics; } @Override public void run() { Consumer consumer = createConsumer(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset()); } } } private Consumer createConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.14.237:9091,192.168.14.237:9092,192.168.14.237:9093"); props.put("group.id", "d"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test", "foo")); return consumer; } public static void main(String[] args) { new kafkaConsumer(new String[] { "testt", "bar" }).start(); } }
就是拿不到消息
在程序main方法中,增加休眠,
Thread.sleep(2000);
因为kafka发送的消息,还在缓存中,还没发送。
而主进程跑完结束了,导致线程终结,消息丢失。
楼主集群中kafka的版本是多少?我的代码跟你一样,但是老报错
你的答案