返回到文章

采纳

编辑于

kafka实战SASL/SCRAM

kafka
实战

创建证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

验证证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin

more config/server.properties

listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

more /etc/kafka/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

more /etc/kafka/kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="alice"
    password="alice-secret";
};

consumer.properties 和 producer.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256

ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

启动zk

export KAFKA_OPTS=''
bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/server.properties

启动生产者和消费者

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties

本例说明文档来自

kafka使用SASL/SCRAM认证