按照博主大大的笔记,安装了kerbros认证,但是用java连接kafka时,报错:
Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
详细日志:
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
at kafka.test.KbConsumer.main(KbConsumer.java:30)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
... 3 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:58)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:109)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:55)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
... 7 more
代码以及配置:
Consumer.java
public class KbConsumer {
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf",System.getProperty("user.dir") + "\\krb5.conf");
System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "10.1.2.46:1234");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(GROUP_ID_CONFIG, "test_consumer_group");
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("sasl.kerberos.service.name", "kafka");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("wwxx"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=true
keyTab="/etc/security/keytabs/kafka.keytab"
principal="clients@EX.COM";
};
krb5.conf
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = EX.COM
dns_lookup_realm = false
dns_lookup_kdc = true
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
EX.COM = {
kdc = 10.1.2.46
admin_server = 10.1.2.46
}
[domain_realm]
kafka = EX.COM
host = EX.COM
zookeeper = EX.COM
127.0.0.1 = EX.COM
10.1.2.46 = EX.COM
bd005 = EX.COM
kafka.keytab
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (aes128-cts-hmac-sha1-96)
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des-hmac-sha1)
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (arcfour-hmac)
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des-cbc-md5)
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (des3-cbc-sha1)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (aes128-cts-hmac-sha1-96)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des3-cbc-sha1)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (arcfour-hmac)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des-hmac-sha1)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (des-cbc-md5)
4 03/13/18 14:27:39 zookeeper/10.1.2.46@EX.COM (aes256-cts-hmac-sha1-96)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (aes256-cts-hmac-sha1-96)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (aes128-cts-hmac-sha1-96)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des3-cbc-sha1)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (arcfour-hmac)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des-hmac-sha1)
2 03/13/18 10:26:01 kafka/127.0.0.1@EX.COM (des-cbc-md5)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (aes256-cts-hmac-sha1-96)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (aes128-cts-hmac-sha1-96)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des3-cbc-sha1)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (arcfour-hmac)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des-hmac-sha1)
2 03/13/18 10:26:34 zookeeper/127.0.0.1@EX.COM (des-cbc-md5)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (aes256-cts-hmac-sha1-96)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (aes128-cts-hmac-sha1-96)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (des3-cbc-sha1)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (arcfour-hmac)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (des-hmac-sha1)
2 03/13/18 10:28:19 kafka/bd005@EX.COM (des-cbc-md5)
2 03/13/18 10:31:45 clients@EX.COM (aes256-cts-hmac-sha1-96)
2 03/13/18 10:31:45 clients@EX.COM (aes128-cts-hmac-sha1-96)
2 03/13/18 10:31:45 clients@EX.COM (des3-cbc-sha1)
2 03/13/18 10:31:45 clients@EX.COM (arcfour-hmac)
2 03/13/18 10:31:45 clients@EX.COM (des-hmac-sha1)
2 03/13/18 10:31:45 clients@EX.COM (des-cbc-md5)
2 03/13/18 14:28:25 kafka/10.1.2.46@EX.COM (aes256-cts-hmac-sha1-96)
principal
K/M@EX.COM
admin/admin@EX.COM
clients@EX.COM
host/10.1.2.46@EX.COM
host/bd005@EX.COM
kadmin/admin@EX.COM
kadmin/bd005@EX.COM
kadmin/changepw@EX.COM
kafka/10.1.2.46@EX.COM
kafka/127.0.0.1@EX.COM
kafka/bd005@EX.COM
krbtgt/EX.COM@EX.COM
test/10.1.2.46@EX.COM
test/bd005@EX.COM
zookeeper/10.1.2.46@EX.COM
zookeeper/127.0.0.1@EX.COM
kafka可以正常启动,也可以自主生产消费。consumer的代码可能有误,新人刚接触kafka,还望高人指点。
此外还有一个问题,对topic添加指定权限后,在其他服务器上也无法对topic进行读写。
使用的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper-host} --add --allow-principal User:* --allow-host * --operation all --topic wwxx