kafka Raft实战SASL/SCRAM

半兽人 发表于: 2024-11-05   最后更新时间: 2024-11-14 10:49:07  
{{totalSubscript}} 订阅, 257 游览

Kafka服务端

创建,格式化,同时添加账户(关键):

bin/kafka-storage.sh format -t "AAAAAAAAAAAAAAAAAAAAAA" -c config/kraft/server.properties \
  --add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'

认证配置

more config/kraft/server.properties

listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://10.0.19.208:9092

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

## SASL
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.mechanism.controller.protocol=SCRAM-SHA-256

more /etc/kafka/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin";
};

启动Kafka

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/kraft/server.properties

Kafka客户端

示例1:命令行

通过kafka自带的生产者和消费者命令行,来验证认证是否正常。

more /etc/kafka/kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};

consumer.properties 和 producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

生产者和消费者

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties

示例2:JAVA生产者和消费者

下面是Java客户端的使用示例:

生产者

package com.system.kafka.clients.demo.producer.origin;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.0.19.208:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 添加 SASL SCRAM 配置
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");  // 使用 SCRAM-SHA-256
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                "username=\"admin\" " +
                "password=\"admin-secret\";");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            try {
                producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i))).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        producer.close();
    }
}

消费者

package com.system.kafka.clients.demo.producer.origin;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.0.19.208:9092");
        props.put("group.id", "myself");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 添加 SASL SCRAM 配置
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                "username=\"admin\" " +
                "password=\"admin-secret\";");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
        }

    }

}

本例说明文档来自

kafka使用SASL/SCRAM认证

REF

https://docs.confluent.io/platform/current/security/authentication/sasl/scram/overview.html
https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API

更新于 2024-11-14

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章