返回到文章

采纳

编辑于

求救:java 连接 kafka kerberos 问题

kafka

按照博主大大的笔记,安装了kerbros认证,但是用java连接kafka时,报错:

Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

详细日志:

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
    at kafka.test.KbConsumer.main(KbConsumer.java:30)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
    ... 3 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940)
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:58)
    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:109)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:55)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
    ... 7 more

代码以及配置:
Consumer.java

 public class KbConsumer {
    public static void main(String[] args) {
        System.setProperty("java.security.krb5.conf",System.getProperty("user.dir") + "\\krb5.conf");
        System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");
        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "10.1.2.46:1234");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(GROUP_ID_CONFIG, "test_consumer_group");
        props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("wwxx"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

        }
    }
}

kafka_client_jaas.conf

KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        useTicketCache=true
        keyTab="/etc/security/keytabs/kafka.keytab"
        principal="clients@EX.COM";
};

krb5.conf

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 default_realm = EX.COM
 dns_lookup_realm = false
 dns_lookup_kdc = true
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true

[realms]
 EX.COM = {
  kdc = 10.1.2.46
  admin_server = 10.1.2.46
 }

[domain_realm]
kafka = EX.COM
host = EX.COM
zookeeper = EX.COM
127.0.0.1 = EX.COM
10.1.2.46 = EX.COM
bd005 = EX.COM

kafka.keytab

4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (aes128-cts-hmac-sha1-96) 
   4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des-hmac-sha1) 
   4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (arcfour-hmac) 
   4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des-cbc-md5) 
   4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des3-cbc-sha1) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (aes128-cts-hmac-sha1-96) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des3-cbc-sha1) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (arcfour-hmac) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des-hmac-sha1) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des-cbc-md5) 
   4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (aes256-cts-hmac-sha1-96) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (aes256-cts-hmac-sha1-96) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (aes128-cts-hmac-sha1-96) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des3-cbc-sha1) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (arcfour-hmac) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des-hmac-sha1) 
   2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des-cbc-md5) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (aes256-cts-hmac-sha1-96) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (aes128-cts-hmac-sha1-96) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des3-cbc-sha1) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (arcfour-hmac) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des-hmac-sha1) 
   2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des-cbc-md5) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (aes256-cts-hmac-sha1-96) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (aes128-cts-hmac-sha1-96) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (des3-cbc-sha1) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (arcfour-hmac) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (des-hmac-sha1) 
   2 03/13/18 10:28:19 kafka/bd005@EX.COM (des-cbc-md5) 
   2 03/13/18 10:31:45 clients@EX.COM (aes256-cts-hmac-sha1-96) 
   2 03/13/18 10:31:45 clients@EX.COM (aes128-cts-hmac-sha1-96) 
   2 03/13/18 10:31:45 clients@EX.COM (des3-cbc-sha1) 
   2 03/13/18 10:31:45 clients@EX.COM (arcfour-hmac) 
   2 03/13/18 10:31:45 clients@EX.COM (des-hmac-sha1) 
   2 03/13/18 10:31:45 clients@EX.COM (des-cbc-md5) 
   2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (aes256-cts-hmac-sha1-96)

principal

   K/M@EX.COM
admin/admin@EX.COM
clients@EX.COM
host/10.1.2.46@EX.COM
host/bd005@EX.COM
kadmin/admin@EX.COM
kadmin/bd005@EX.COM
kadmin/changepw@EX.COM
kafka/10.1.2.46@EX.COM
kafka/127.0.0.1@EX.COM
kafka/bd005@EX.COM
krbtgt/EX.COM@EX.COM
test/10.1.2.46@EX.COM
test/bd005@EX.COM
zookeeper/10.1.2.46@EX.COM
zookeeper/127.0.0.1@EX.COM

kafka可以正常启动,也可以自主生产消费。consumer的代码可能有误,新人刚接触kafka,还望高人指点。
此外还有一个问题,对topic添加指定权限后,在其他服务器上也无法对topic进行读写。
使用的命令:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper-host} --add --allow-principal User:* --allow-host *   --operation all --topic wwxx