给运行的kafka节点添加安全认证,切换不成功,broker之间不能通讯

安东尼的不二情 发表于: 2019-03-21   最后更新时间: 2019-03-21 16:33:05   4,507 游览

描述

运行两台无安全认证kafka的节点,尝试在运行的节点中添加SASL_PLAINTEXT认证

代码

version: '2'
services:
  zk:
    image: wurstmeister/zookeeper
    ports:
      - "2181"
  kafka1:
    image: wurstmeister/kafka
    depends_on: [ zk ]
    ports:
      - "19092:19092"
    volumes:
      - "/home/kafka/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_LISTENERS: PLAINTEXT://:19092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx:19092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LOG_CLEANUP_POLICY: "delete"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT
   kafka2:
    image: wurstmeister/kafka
    depends_on: [ zk ]
    ports:
      - "29092:29092"
      - "29093:29093"
    volumes:
      - "/home/kafka/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_LISTENERS: PLAINTEXT://:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx:29092
      KAFKA_LOG_CLEANUP_POLICY: "delete"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT
  1. 创建topic ops2
    ./kafka-topics.sh --zookeeper zk:2181 --create --topic ops2 --partitions 3  --replication-factor 2
    显示IRS 为1,2
    
  2. 切换第二台brokers,其他不变
    kafka2:
     image: wurstmeister/kafka
     depends_on: [ zk ]
     ports:
       - "29092:29092"
       - "29093:29093"
     volumes:
       - "/home/kafka/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf"
     environment:
       KAFKA_BROKER_ID: 2
       KAFKA_ZOOKEEPER_CONNECT: zk:2181
       KAFKA_LISTENERS: PLAINTEXT://:29092,SASL_PLAINTEXT://:29093
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx:29092,SASL_PLAINTEXT://xx.xx:29093
       KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
       KAFKA_LOG_CLEANUP_POLICY: "delete"
       KAFKA_DELETE_TOPIC_ENABLE: "true"
       KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
       KAFKA_SUPER_USERS: User:admin
       KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT
    

3.重新启动第二台节点,再次查看test1的topic ISR显示仅为1
一下是broker1的异常日志,broker2无异常日志

INFO [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Truncating partition ops2-1 to local high watermark 0 (kafka.server.ReplicaFetcherThread)
[2019-03-21 07:18:13,413] INFO [Log partition=ops2-1, dir=/kafka/kafka-logs-753b2ceb8e58] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)
[2019-03-21 07:18:13,910] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition ops2-1 at offset 0 (kafka.server.ReplicaFetcherThread)
...
ERROR [MetadataCache brokerId=1] Listeners are not identical across brokers: Map(2 -> Map(ListenerName(PLAINTEXT) -> xx:29092 (id: 2 rack: null), ListenerName(SASL_PLAINTEXT) -> xx:29093 (id: 2 rack: null)), 1 -> Map(ListenerName(PLAINTEXT) -> xx:19092 (id: 1 rack: null))) (kafka.server.MetadataCache)
[2019-03-21 07:19:13,295] ERROR [MetadataCache brokerId=1] Listeners are not identical across brokers: Map(2 -> Map(ListenerName(PLAINTEXT) -> xx:29092 (id: 2 rack: null), ListenerName(SASL_PLAINTEXT) -> xx:29093 (id: 2 rack: null)), 1 -> Map(ListenerName(PLAINTEXT) -> xx.223:19092 (id: 1 rack: null))) (kafka.server.MetadataCache)
发表于 2019-03-21
添加评论

KAFKA_SASL_ENABLED_MECHANISMS应该是PLAIN吧

已修改,但还是一样

节点1的这俩个参数去掉,用默认的。
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT

你可以参考 https://www.orchome.com/472 配置。
搜索:security.inter.broker.protocolSecurity 参考,你的协议开启的不对。

今早测试了下可以了,但还有一点就是在切换完成之后,消费者会被动停止,提示认证失败,生产者还能继续发送。然后消费者切换成认证端口能继续消费,这样正常吗

不正常的,客户端默认连的plaintext。就不会提示认证相关的错误的。

你的答案

查看kafka相关的其他问题或提一个您自己的问题