kafka配置了sasl之后Consumer消费不了数据

scattered and scattered 发表于: 2020-01-18   最后更新时间: 2020-01-18 22:20:50   3,093 游览

kafka配置了sasl之后Consumer消费不了数据

screenshot


screenshot

怎么样可以看到这个日志呢?或者怎么可以排查这个问题呢?

发表于 2020-01-18
¥6.0
System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");

试试这种,错误日志到服务器上看,如果认证错误会提示的。

这个是配置在java里面吗?在服务器的哪个位置看呢

while (true) {
System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\kafka_client_jaas.conf");
System.setProperty("sun.security.krb5.debug", "");
System.out.println("xccxxx");
ConsumerRecords records = consumer.poll(50);

我看你之前的问题,服务器还没验证通过,先用命令验证没问题了,在用java客户端验证。

另外,你得注意一下,我看你一会用这个认证,一会用那个。
你按照我的教程顺序一个一个来,别急。
https://www.orchome.com/170

大佬。是这样的,公司让我做一个消费者,然后他们公司是有做鉴权的,也把他们的代码给我,发现就是卡在那,一直不下去,然后我才想自己的服务器装个kafka,模拟下的,现在是我Kafka启动了,但是加了鉴权后,就来拿启动kafka都失败了,

所以现在我想干脆不模拟了。直接获取到错误,这个应该怎么获取呢?

贴下消费者的代码,给我看看

private static final String KAFKA_BROKER = "111.229.3.24:9092,122.173.12.42:9092";
/**
 * 消费组id
 */
private static String GROUP_ID = "test-consumer-group";
/**
 * 消费主题
 */
private static String TOPIC_NAME = "first";

static {
    try {
        props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
        // 指定消费者所属群组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        // 自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000");
        /**kafka鉴权**/
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
    } catch (Exception e) {
        logger.error("消费者" + e);
    }
}

public static void main(String[] args) {
    KafkaConsumer<string, string="" > consumer = null;
    try {
        consumer = new KafkaConsumer<string, string = "" > (props);
        consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME}));
        while (true) {
            ConsumerRecords<string, string="" > records = consumer.poll(50);
            for (ConsumerRecord<string, string="" > record :records){
                //消费数据
                System.out.println(record.value().toString() + ":sys打印的数据");
                logger.info(record.value());
            }
            Thread.sleep(10);
        }
    } catch (Exception e) {
        System.out.println("报错了:" + e);
        logger.error("消费者" + e);
    } finally {
        consumer.close();
    }
}
半兽人 -> 半兽人 4年前

我看你上面的加密钥程序,是正常的呀。
你只是没有消息,所以只是在反复的去拉取消息而已。

没,那是第二张图,我把鉴权注释了之后才可以的,您看第一幅图,一直卡在那里

没有任何报错信息吗?客户端的版本是否跟他们的版本一致?

一致的,可以问下您,1.以大佬来看,这种没反应可能是什么问题呢?2.就是关于报错信息,没有任何回应,这个应该怎么获取呢?服务器也是他们的,我们登不上去的

你把debug打开吧,我第一次遇到不报任何错误的。总是会超过60秒后报个超时啥的。

你的答案

查看kafka相关的其他问题或提一个您自己的问题