2022-01-18T17:51:16.165+08:00 WARN lcresort.lcresort-web [kafka-producer-network-thread | DemoProducer] [org.apache.kafka.clients.NetworkClient:585] - Connection to node -5 terminated during authentication. This may indicate that authentication failed due to invalid credentials. 2022-01-18T17:51:16.169+08:00 WARN lcresort.lcresort-web [kafka-producer-network-thread | DemoProducer] [org.apache.kafka.clients.NetworkClient:585] - Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials. 2022-01-18T17:51:16.173+08:00 WARN lcresort.lcresort-web [kafka-producer-network-thread | DemoProducer] [org.apache.kafka.clients.NetworkClient:585] - Connection to node -4 terminated during authentication. This may indicate that authentication failed due to invalid credentials. 2022-01-18T17:51:16.181+08:00 WARN lcresort.lcresort-web [kafka-producer-network-thread | DemoProducer] [org.apache.kafka.clients.NetworkClient:585] - Connection to node -3 terminated during authentication. This may indicate that authentication failed due to invalid credentials. 2022-01-18T17:51:16.239+08:00 WARN lcresort.lcresort-web [kafka-producer-network-thread | DemoProducer] [org.apache.kafka.clients.NetworkClient:585] - Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials. ...
是因为本地时间和认证服务器时间对不上吗?
首先,你先用命令行进行生产和消费,确认下是否一切正常:kafka实战kerberos(笔记)
其次,当你java程序连接集群的时候,你观察下kafka打印的日志。
另外,你需要补充一下java认证的部分代码,看看你是如何设置认证的。
public class LoginUtil { private final static HikGaLogger LOG = HikGaLoggerFactory.getLogger(LoginUtil.class); public enum Module { STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); private String name; private Module(String name) { this.name = name; } public String getName() { return name; } } /** * line operator string */ private static final String LINE_SEPARATOR = System.getProperty("line.separator"); /** * jaas file postfix */ private static final String JAAS_POSTFIX = ".jaas.conf"; /** * is IBM jdk or not */ private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); /** * IBM jdk login module */ private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; /** * oracle jdk login module */ private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; /** * Zookeeper quorum principal. */ public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; /** * java security krb5 file path */ public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; /** * java security login file path */ public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; /** * 设置jaas.conf文件 * * @param principal * @param keytabPath * @throws IOException */ public static void setJaasFile(String principal, String keytabPath) throws IOException { String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX; // 删除jaas文件 deleteJaasFile(jaasPath); writeJaasFile(jaasPath, principal, keytabPath); System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); } /** * 设置zookeeper服务端principal * * @param zkServerPrincipal * @throws IOException */ public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException { System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); //LOG.debug(ret); if (ret == null) { throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); } if (!ret.equals(zkServerPrincipal)) { throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); } } /** * 设置krb5文件 * * @param krb5ConfFile * @throws IOException */ public static void setKrb5Config(String krb5ConfFile) throws IOException { System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); //LOG.debug(ret); if (ret == null) { throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); } if (!ret.equals(krb5ConfFile)) { throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); } } /** * 写入jaas文件 * * @throws IOException * 写文件异常 */ private static void writeJaasFile(String jaasPath, String principal, String keytabPath) throws IOException { FileWriter writer = new FileWriter(new File(jaasPath)); try { writer.write(getJaasConfContext(principal, keytabPath)); writer.flush(); } catch (IOException e) { throw new IOException("Failed to create jaas.conf File"); } finally { writer.close(); } } private static void deleteJaasFile(String jaasPath) throws IOException { File jaasFile = new File(jaasPath); if (jaasFile.exists()) { if (!jaasFile.delete()) { throw new IOException("Failed to delete exists jaas file."); } } } private static String getJaasConfContext(String principal, String keytabPath) { Module[] allModule = Module.values(); StringBuilder builder = new StringBuilder(); for (Module modlue : allModule) { builder.append(getModuleContext(principal, keytabPath, modlue)); } return builder.toString(); } private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { StringBuilder builder = new StringBuilder(); if (IS_IBM_JDK) { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("credsType=both").append(LINE_SEPARATOR); builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR); } else { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("useKeyTab=true").append(LINE_SEPARATOR); builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); builder.append("useTicketCache=false").append(LINE_SEPARATOR); builder.append("storeKey=true").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR); } return builder.toString(); } }
这个是启动前初始化设置设置认证文件
命令生产和消费还没尝试过
启动之后一小会儿就一直不停的报:
去kafka集群,看看日志。
kafka是华为那边的,好像拿不到这个日志,我这边Linux服务器上klist可以看到信息,应该是服务器上已经认证成功了,在windows系统上的程序也能成功生产信息发送过去,目前就是在Linux上跑的程序一直报这个错误
那基本可以聚焦是证书路径的问题了,你先写成绝对路径看看。
目前写的是绝对路径
核对几次了,是没问题的。
jaas的路径是
你问题解决了吗,我也是一直报跟你一样的错误
解决了
你对接的是华为的吗
请问您这个问题是如何解决的呢?
我对接的也是华为的
请问这个问题解决了吗
你好,请问是怎么解决的呀?我这遇到一样的报错。
你的答案