按照orcHome网上的教程写了消费者程序,但是不能消费消息,但是用Kafka命令能消费消息,以下是我的相关配置。
kafka版本是kafka_2.11-1.0.1
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.165:9092,192.168.201.168:9092");//服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");//消息者组
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//是否自动提交偏移量
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//指定的时间内发送心跳给群组的时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");//送心跳的频率一般设置成session.timeout.ms值的3分之一。
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");//消息者从服务器获取记录的最小字节数。
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");//等到有足够的数据时才返回给消费者,看和fetch.min.bytes参数哪个先满足。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//从头条开始处理消费
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
public void run() {
String topic=Global.getConfig("get_topic");
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("tostring="+record.toString()+"|key="+record.key()+"|value="+record.value());
}
consumer.commitSync();
}
}
我用Debug跟踪了一下,发现卡在AbstractCoordinator类中ensureCoordinatorReady方法这块了,老是进行循环,打印log.debug("Coordinator discovery failed, refreshing metadata")
。
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future, remainingMs);
if (future.failed()) {
if (future.isRetriable()) {
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
*log.debug("Coordinator discovery failed, refreshing metadata");*
client.awaitMetadataUpdate(remainingMs);
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
time.sleep(retryBackoffMs);
}
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
}
return !coordinatorUnknown();
}
睡觉前对这个问题不死心,打开电脑又查了查,终于解决了问题。主要是因为我服务器之前装过Kafka所以产生了脏数据。
以下是解决过程:
1、进入zookeeper 运行
zkCli.sh
。2、运行
ls /brokers/topics
查看主题3、然后运行
rmr /brokers/topics/consumer_offsets
删除consumer_offsets_
主题4、然后重启kafka集群就好了。
赞。
我也是这个问题,最后同样的方式解决了。才看到这条,相见恨晚
我也是这个问题,最后同样的方式解决了。感谢楼主
有没有报错?
控制台打印这个,也没啥错误啊,消息是我用命令发送的。/usr/local/kafka/bin/kafka-console-
producer.sh --broker-list 192.168.202.165:9092 --topic test 2018-04-02 22:03:08,170 INFO [main] org.springframework.boot.StartupInfoLogger[logStarted 57]: Started Application in 15.684 seconds (JVM running for 16.758) 2018-04-02 22:03:08,666 INFO [pool-2-thread-1] org.apache.kafka.clients.Metadata[update 265]: Cluster ID: 380dL-1DRV2PUuB1NLMSbw
调试了好几次小时了,没有解决真是郁闷。。但是我用以下这个jar包就能消费消息,感觉还是代码哪块没写好。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.0-beta1</version> </dependency>
在老项目里用这个代码就能消息到消息。
String topic = Global.getConfig("topic_task_history");//接收机器返回任务状态的topic Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { MessageAndMetadata<byte[], byte[]> msg = iterator.next(); String message; try { message = new String(msg.message(),"UTF-8"); System.out.println("message="+message); } catch (UnsupportedEncodingException e) { logger.error("监控历史消息|异常",e); } }
改了参数,直接运行试试。
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.30.34.4:9092"); props.put("group.id", "myself"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TRADE-NOTIFY")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value()); } } }
刚试了一下,用这个测试类还是不行。难道是我Kafka服务器配置有问题?
没错误,实在是不知道怎么帮你
我也挺郁闷的。。刚重新装了Kafka了也不行。这个日志信息是啥意思呢?log.debug("Coordinator discovery failed, refreshing metadata")
你telnet下你的服务器。9092端口的
都是通的。。
确保topic中有消息。
你的答案