Kafka SASL认证与ACL配置

发布时间:2025-06-24 16:52:30  作者:北方职教升学中心  阅读量:369


​ Kafka版本 2.12-2.2.0,Zookeeper版本:3.4.14,认证方式:SASL/PLAIN,这种方式其实就是一个账号/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,密码明文等等!建议大家用SASL/SCRAM的方式 ,这种方式 用户名/密码是存储在zookeeper中,能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。本文主要介绍SASL/PLAIN方式:

一、SSL证书生成

(1)创建证书目录

#kafka部署目录为/usr/local/kafka_2.12-2.2.0mkdir/usr/local/kafka_2.12-2.2.0/ssl

(2)生成服务端的keystore文件(server.keystore.jks

​ 每个broker节点执行

keytool -keystoreserver.keystore.jks -aliaskafka -validity3650-genkey-storepass123456-keypass123456-dname"CN=192.168.11.35,OU=hc,O=hw,L=shenzhen,ST=guangdong,C=CN"#CN=名字与姓氏/域名,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码#查看证书命令keytool -list-v-keystore./server.keystore.jks

(3)生成CA认证证书(ca-cert、ca-key

​ 在任一broker节点执行,只需要执行一次,执行完成后生成了两个文件cat-key、ca-cert,将这两个文件分别拷贝到所有broker节点上。

openssl req -new-x509-keyoutca-key -outca-cert -days3650-passoutpass:123456 -subj"/C=cn/ST=guangdong/L=shenzhen/O=hw/OU=hc/CN=192.168.11.35"

(4)通过CA证书创建一个客户端信任证书(client.truststore.jks

​ 每个broker节点执行

keytool -keystoreclient.truststore.jks -aliasCAKafka -import-fileca-cert -storepass123456-noprompt

(5)通过CA证书创建一个服务端信任证书(server.truststore.jks

​ 每个broker节点执行

keytool -keystoreserver.truststore.jks -aliasCAKafka -import-fileca-cert -storepass123456-noprompt

以下为给服务端证书签名

(6)从密钥库导出证书服务端证书(cert-file

​ 每个broker节点执行

keytool -keystoreserver.keystore.jks -aliaskafka -certreq-filecert-file -storepass123456

(7)用CA给服务端证书进行签名处理(cert-signed

​ 每个broker节点执行

openssl x509 -req-CAca-cert -CAkeyca-key -incert-file -outcert-signed -days3650-CAcreateserial-CAserialca-cert.srl -passinpass:123456

(8)将CA证书导入到服务端keystore

​ 每个broker节点执行

keytool -keystoreserver.keystore.jks -aliasCAKafka -import-fileca-cert -storepass123456-noprompt

(9)将已签名的服务器证书导入到服务器keystore

​ 每个broker节点执行

keytool -keystoreserver.keystore.jks -aliaskafka -import-filecert-signed -storepass123456

客户端SSL证书签发

(10)导出客户端证书(client.keystore.jks

keytool -keystoreclient.keystore.jks -aliaskafka -validity3650-genkey-storepass123456-keypass123456-dname"CN=192.168.11.35,OU=hc,O=hw,L=shenzhen,ST=guangdong,C=CN"

(11)将证书文件导入到客户端keystore(client.cert-file

keytool -keystoreclient.keystore.jks -aliaskafka -certreq-fileclient.cert-file -storepass123456

(12)用CA给客户端证书进行签名处理(client.cert-signed

openssl x509 -req-CAca-cert -CAkeyca-key -inclient.cert-file -outclient.cert-signed -days3650-CAcreateserial-CAserialca-cert.srl -passinpass:123456

(13)将CA证书导入到客户端keystore

keytool -keystoreclient.keystore.jks -aliasCAKafka -import-fileca-cert -storepass123456-noprompt

(14)将已签名的证书导入到客户端keystore

keytool -keystoreclient.keystore.jks -aliaskafka -import-fileclient.cert-signed -storepass123456

四个认证文件说明:

  1. client.keystore.jks
    是客户端密钥库文件。它包含客户端的私钥和相应的证书。客户端使用该私钥进行身份验证和安全通信。客户端的私钥用于加密传输给服务器的数据,并与服务器的公钥进行握手和密钥交换,确保通信的安全性和完整性。
  2. client.truststore.jks
    是客户端信任库文件。它包含客户端信任的根证书或证书链。客户端使用该信任库来验证服务器的证书是否可信。如果服务器的证书由客户端信任库中的任何一个根证书所签发,客户端将信任服务器的证书,并继续与服务器进行安全通信。
  3. server.keystore.jks
    是服务器密钥库文件。它包含服务器的私钥和相应的证书。服务器使用该私钥进行身份验证和安全通信。服务器的私钥用于解密客户端发送的数据,并与客户端的公钥进行握手和密钥交换,确保通信的安全性和完整性。
  4. server.truststore.jks
    是服务器信任库文件。它包含服务器信任的根证书或证书链。服务器使用该信任库来验证客户端的证书是否可信。如果客户端的证书由服务器信任库中的任何一个根证书所签发,服务器将信任客户端的证书,并继续与客户端进行安全通信。

二、Kafka配置

2.1、服务端配置

(1)修改kafka配置文件 server.properties

/usr/local/kafka_2.12-2.2.0/config/server.properties#修改listeners,添加SASL_PLAINTEXT://192.168.11.35:9093listeners=SASL_PLAINTEXT://192.168.11.35:9093,PLAINTEXT://192.168.1.35:9092#对外发布地址,添加SASL_PLAINTEXT://39.108.132.165:9093,39.108.132.165为公网IPadvertised.listeners=SASL_PLAINTEXT://39.108.132.165:9093,PLAINTEXT://192.168.11.35:9092#添加以下为sasl安全配置security.inter.broker.protocol=SASL_PLAINTEXT	#表示Broker间通信使用SASL_PLAINTEXTsasl.enabled.mechanisms=PLAIN	#表示开启PLAIN认证机制sasl.mechanism.inter.broker.protocol=PLAIN	#表示Broker间通信也启用PLAIN机制authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer	#设置身份验证使用的类super.users=User:admin	#设置超级用户,如果是多个需要分号分割,例如:User:admin;User:rootallow.everyone.if.no.acl.found=false	#默认为true,对所有用户topic可见,要禁用。ssl.keystore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.keystore.jksssl.keystore.password=123456ssl.key.password=123456ssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jksssl.truststore.password=123456# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可ssl.endpoint.identification.algorithm=#日志目录log.dirs=/usr/local/kafka_2.12-2.2.0/logs#连接Zookeeper的配置zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000#其它配置使用默认即可

如果用 SASL_SSL的认证方式,只需将以上配置文件中 SASL_PLAINTEXT改为 SASL_SSL即可。

如果要实现外网 9093端口使用 SASL_SSL,内网 9092端口使用 SASL_PLAINTEXT,可以将 listeners 和 advertised.listeners配置改成如下形式:

listeners=SASL_SSL://192.168.11.35:9093,SASL_PLAINTEXT://192.168.1.35:9092

advertised.listeners=SASL_SSL://39.108.132.165:9093,SASL_PLAINTEXT://192.168.11.35:9092

其它配置不变,这样可以实现在内网连接 9092 端口,不需要 jks 文件,直接使用账号密码的方式连接 kafka ,同时配合 ACL 规则实现 topic 和 group 的权限 ;外网则连接 9093 端口,并需要 jks 文件验证建立连接,再通过账号密码配合 ACL 规则实现 topic 和 group 的权限。

(2)添加SASL配置文件

​ 在kafka配置目录 **/usr/local/kafka_2.12-2.2.0/config **下创建一个 kafka_server_jaas.conf文件。

vim/usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required    username="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";};Client {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="admin"password="1qaz@WSX";};

KafkaServer字段是用来配置broker间通信使用的用户名和密码以及客户端连接时需要的用户名和密码,其中username和password是broker用于初始化连接到其他的broker,kafka用户为broker间的通讯。在一个Kafka集群中,这个文件的内容要一样,集群中每个Borker去连接其他Broker的时候都使用这个文件中定义的username和password来让对方进行认证。

user_NAME语句,是用来配置客户端连接Broker时使用的用户名和密码,也就是新建用户的用户名都是以user_开头的,等号后面是密码,密码可以是明文且没有复杂度要求。完整语句是user_USERNAME="PASSWORD"

Client部分是用来设置与Zookeeper的连接的,它还允许broker设置 SASL ACL 到zookeeper 节点,锁定这些节点,只有broker可以修改它。如果Zookeeper与Broker之间不设置认证,那么就可以不配置。验证类org.apache.zookeeper.server.auth.DigestLoginModule 也可以使用org.apache.kafka.common.security.plain.PlainLoginModule ,这个类需要用到 kafka 相关的 jar 包,后续3.1节有说明。

(3)在kafka启动脚本中配置环境变量

​ 在 /usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh配置中增加环境变量:

-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf

vim/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh...if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"fi

(4)启动kafka

/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon/usr/local/kafka_2.12-2.2.0/config/server.properties

2.2、客户端配置

(1)添加SASL配置文件

​ 在kafka配置目录 **/usr/local/kafka_2.12-2.2.0/config **下创建一个 kafka-client-jaas.conf文件。

vim/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required    username="kafka"password="1qaz@WSX";};

说明:这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了kafka这个用户。

(2)在kafka-console-producer.sh脚本中配置环境变量

​ 找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf

vim/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh...if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"fi...

(3)在kafka-console-consumer.sh脚本中配置环境变量

​ 找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf

...if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.2.0/config/kafka_client_jaas.conf"fi...

(4)配置SASL_PLAINTEXT验证信息

​ 修改 /usr/local/kafka_2.12-2.2.0/config下的 producer.propertiesconsumer.properties配置SASL_SSL验证的基本信息:

producer.properties:

vim/usr/local/kafka_2.12-2.2.0/config/producer.properties#添加以下内容security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka"password="1qaz@WSX";#如果是SASL_SSL方式,则添加以下内容security.protocol=SASL_SSLssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jksssl.truststore.password=123456ssl.endpoint.identification.algorithm=sasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka"password="1qaz@WSX";

consumer.properties:

vim/usr/local/kafka_2.12-2.2.0/config/consumer.properties#添加以下内容security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka"password="1qaz@WSX";#如果是SASL_SSL方式,则添加以下内容security.protocol=SASL_SSLssl.truststore.location=/usr/local/kafka_2.12-2.2.0/ssl/server.truststore.jksssl.truststore.password=123456ssl.endpoint.identification.algorithm=sasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka"password="1qaz@WSX";

(5)使用命令行操作测试

如果使用 SASL_PLAINTEXT方式,则连接9092端口,如下命令

/usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 192.168.11.35:9092 --topicfirst --producer.config/usr/local/kafka_2.12-2.2.0/config/producer.properties>123>adb>qwe/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9092 --topicfirst -consumer.config/usr/local/kafka_2.12-2.2.0/config/consumer.properties123adbqwe

​ 如果使用 SASL_SSL方式,则连接9093端口,如下命令:

/usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 192.168.11.35:9093 --topicfirst --producer.config/usr/local/kafka_2.12-2.2.0/config/producer.properties>123>adb>qwe/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9093 --topicfirst -consumer.config/usr/local/kafka_2.12-2.2.0/config/consumer.properties123adbqwe

三、配置ACL

3.1、Zookeeper 配置 SASL

​ 若只关注 Kafka 的安全认证,不需要配置 Zookeeper 的 SASL,但 Kafka 会在 Zookeeper 中存储一些必要的信息,因此 zk 的安全认证也会影响到 Kafka。

(1)新建 zoo_jaas.conf 文件

​ 在 Zookeeper 的 conf 目录下新建一个 zoo_jaas.conf 文件:

vim/usr/local/zookeeper-3.4.14/conf/zoo_jaas.conf  Server {org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";};

Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zookeeper 节点该属性一致即可;
Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名。

验证类也可以使用 org.apache.zookeeper.server.auth.DigestLoginModule这个,org.apache.kafka.common.security.plain.PlainLoginModule 这个类需要用到 kafka 相关的 jar 包,需要复制 kafka 相应的 jar 包到 zookeeper 的 lib 目录,后续有说到。

(2)修改 zoo.conf 文件

vim/usr/local/zookeeper-3.4.14/conf/zoo.conf #添加以下内容authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=3600000zookeeper.sasl.client=true

zookeeper.sasl.client 设置为 true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已。

(3)导入依赖包

​ 因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,将 kafka 的 jar 包复制到 zookeeper 的 lib 目录:

cp/usr/local/kafka_2.12-2.2.0/libs/kafka-clients-2.2.0.jar /usr/local/zookeeper-3.4.14/lib/cp/usr/local/kafka_2.12-2.2.0/libs/snappy-java-1.1.7.2.jar /usr/local/zookeeper-3.4.14/lib/cp/usr/local/kafka_2.12-2.2.0/libs/lz4-java-1.5.0.jar /usr/local/zookeeper-3.4.14/lib/

​ 如果使用这个类 org.apache.zookeeper.server.auth.DigestLoginModule则不用以上操作。

(4)修改 zkEnv.sh 文件

vim/usr/local/zookeeper-3.4.14/bin/zkEnv.sh#在ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."行下面添加以下内容SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/zookeeper-3.4.14/conf/zoo_jaas.conf"

(5)重启 zookeeper

/usr/local/zookeeper-3.4.14/bin/zkServer.sh restart

3.2、Kafka配置ACL

​ 在配置好SASL后,启动Zookeeper和Kafka之后,就可以使用 kafka-acls.sh脚本来操作ACL机制。 kafka-acls.sh脚本位于Kafka安装目录下的bin目录,如 /usr/local/kafka_2.12-2.2.0/bin/,进入此目录执行以下相关命令:

(1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权

./kafka-acls.sh --list--authorizer-properties zookeeper.connect=localhost:2181

(2)配置ACL来让writer用户有权限写入test这个topic

./kafka-acls.sh --authorizerkafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:writer --operationWrite --topictest

(3)为reader用户设置test topic的读权限

./kafka-acls.sh --authorizerkafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:reader --operationRead --topictest

(4)设置访问group的权限

./kafka-acls.sh --authorizerkafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:reader --operationRead --grouptest-group

如果是删除操作就把上面的 --add 改成 --remove。

–operation支持的操作有:READ、WRITE、DELETE、CREATE、ALTER、DESCRIBE、ALL,所以一般我们快捷方式授权:

  • 授权给某个消费者组读取和订阅权限就直接使用--consumer参数而不使用--operation来指定,前者就包含了允许消费者在主题上READ、DESCRIBE以及在消费者组在主题上READ。
  • 授权给某个消费者组写和订阅权限就直接使用--producer参数,它包括了生产者对主题有写和订阅权限以及在集群上建立主题的权限。
# 快捷设置允许某个组使用某用户对某主题读操作./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:alice --consumer--topictest--groupGroupA# 快捷设置允许某用户对某主题写操作./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:alice --producer--topictest

如果授权用户读写那么就要手动设置--operation参数,其它一些授权实例如下:

# 允许 alice 用户读取和写入test主题./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:alice --operationRead --operationWrite --topictest# 允许GroupA组使用alice用户对主题test读写操作./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:alice --operationRead --operationWrite --topictest--groupGroupA# 允许所有用户对某个主题有读写操作./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:* --consumer--topictest# 拒绝某个主机使用alice账号读写,允许其他所有的主机使用任何账号进行读写某个主题./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add--allow-principal User:* --allow-host * --deny-principal User:alice --deny-host 198.51.100.3 --consumer--topictest