kafka配置了sasl之后Consumer消费不了数据
怎么样可以看到这个日志呢?或者怎么可以排查这个问题呢?
System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");
试试这种,错误日志到服务器上看,如果认证错误会提示的。
这个是配置在java里面吗?在服务器的哪个位置看呢
while (true) { System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\kafka_client_jaas.conf"); System.setProperty("sun.security.krb5.debug", ""); System.out.println("xccxxx"); ConsumerRecords records = consumer.poll(50);
我看你之前的问题,服务器还没验证通过,先用命令验证没问题了,在用java客户端验证。
另外,你得注意一下,我看你一会用这个认证,一会用那个。你按照我的教程顺序一个一个来,别急。https://www.orchome.com/170
大佬。是这样的,公司让我做一个消费者,然后他们公司是有做鉴权的,也把他们的代码给我,发现就是卡在那,一直不下去,然后我才想自己的服务器装个kafka,模拟下的,现在是我Kafka启动了,但是加了鉴权后,就来拿启动kafka都失败了,
所以现在我想干脆不模拟了。直接获取到错误,这个应该怎么获取呢?
贴下消费者的代码,给我看看
private static final String KAFKA_BROKER = "111.229.3.24:9092,122.173.12.42:9092"; /** * 消费组id */ private static String GROUP_ID = "test-consumer-group"; /** * 消费主题 */ private static String TOPIC_NAME = "first"; static { try { props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER); // 指定消费者所属群组 props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000"); /**kafka鉴权**/ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); } catch (Exception e) { logger.error("消费者" + e); } } public static void main(String[] args) { KafkaConsumer<string, string="" > consumer = null; try { consumer = new KafkaConsumer<string, string = "" > (props); consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME})); while (true) { ConsumerRecords<string, string="" > records = consumer.poll(50); for (ConsumerRecord<string, string="" > record :records){ //消费数据 System.out.println(record.value().toString() + ":sys打印的数据"); logger.info(record.value()); } Thread.sleep(10); } } catch (Exception e) { System.out.println("报错了:" + e); logger.error("消费者" + e); } finally { consumer.close(); } }
我看你上面的加密钥程序,是正常的呀。你只是没有消息,所以只是在反复的去拉取消息而已。
没,那是第二张图,我把鉴权注释了之后才可以的,您看第一幅图,一直卡在那里
没有任何报错信息吗?客户端的版本是否跟他们的版本一致?
一致的,可以问下您,1.以大佬来看,这种没反应可能是什么问题呢?2.就是关于报错信息,没有任何回应,这个应该怎么获取呢?服务器也是他们的,我们登不上去的
你把debug打开吧,我第一次遇到不报任何错误的。总是会超过60秒后报个超时啥的。
找不到想要的答案?提一个您自己的问题。
0 声望
这家伙太懒,什么都没留下
System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");
试试这种,错误日志到服务器上看,如果认证错误会提示的。
这个是配置在java里面吗?在服务器的哪个位置看呢
while (true) { System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\kafka_client_jaas.conf"); System.setProperty("sun.security.krb5.debug", ""); System.out.println("xccxxx"); ConsumerRecords records = consumer.poll(50);
我看你之前的问题,服务器还没验证通过,先用命令验证没问题了,在用java客户端验证。
另外,你得注意一下,我看你一会用这个认证,一会用那个。
你按照我的教程顺序一个一个来,别急。
https://www.orchome.com/170
大佬。是这样的,公司让我做一个消费者,然后他们公司是有做鉴权的,也把他们的代码给我,发现就是卡在那,一直不下去,然后我才想自己的服务器装个kafka,模拟下的,现在是我Kafka启动了,但是加了鉴权后,就来拿启动kafka都失败了,
所以现在我想干脆不模拟了。直接获取到错误,这个应该怎么获取呢?
贴下消费者的代码,给我看看
private static final String KAFKA_BROKER = "111.229.3.24:9092,122.173.12.42:9092"; /** * 消费组id */ private static String GROUP_ID = "test-consumer-group"; /** * 消费主题 */ private static String TOPIC_NAME = "first"; static { try { props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER); // 指定消费者所属群组 props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000"); /**kafka鉴权**/ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); } catch (Exception e) { logger.error("消费者" + e); } } public static void main(String[] args) { KafkaConsumer<string, string="" > consumer = null; try { consumer = new KafkaConsumer<string, string = "" > (props); consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME})); while (true) { ConsumerRecords<string, string="" > records = consumer.poll(50); for (ConsumerRecord<string, string="" > record :records){ //消费数据 System.out.println(record.value().toString() + ":sys打印的数据"); logger.info(record.value()); } Thread.sleep(10); } } catch (Exception e) { System.out.println("报错了:" + e); logger.error("消费者" + e); } finally { consumer.close(); } }
我看你上面的加密钥程序,是正常的呀。
你只是没有消息,所以只是在反复的去拉取消息而已。
没,那是第二张图,我把鉴权注释了之后才可以的,您看第一幅图,一直卡在那里
没有任何报错信息吗?客户端的版本是否跟他们的版本一致?
一致的,可以问下您,1.以大佬来看,这种没反应可能是什么问题呢?2.就是关于报错信息,没有任何回应,这个应该怎么获取呢?服务器也是他们的,我们登不上去的
你把debug打开吧,我第一次遇到不报任何错误的。总是会超过60秒后报个超时啥的。
你的答案